diff --git a/archapp/src/archeng.rs b/archapp/src/archeng.rs index d3b5e3e..760eb96 100644 --- a/archapp/src/archeng.rs +++ b/archapp/src/archeng.rs @@ -1,33 +1,34 @@ +pub mod datablock; pub mod datablockstream; +pub mod diskio; pub mod indexfiles; +pub mod indextree; pub mod pipe; use self::indexfiles::list_index_files; +use self::indextree::channel_list; use crate::eventsitem::EventsItem; -use crate::plainevents::{PlainEvents, ScalarPlainEvents}; use crate::timed::Timed; use crate::wrap_task; use async_channel::{Receiver, Sender}; use err::Error; use futures_util::StreamExt; -use items::eventvalues::EventValues; use items::{RangeCompletableItem, Sitemty, StatsItem, StreamItem}; +use netpod::log::*; use netpod::timeunits::SEC; use netpod::{ - log::*, Channel, ChannelArchiver, ChannelConfigQuery, ChannelConfigResponse, DataHeaderPos, DiskStats, FilePos, - NanoRange, Nanos, OpenStats, ReadExactStats, ReadStats, SeekStats, + ChannelArchiver, ChannelConfigQuery, ChannelConfigResponse, DiskStats, OpenStats, ReadExactStats, ReadStats, + SeekStats, }; -use regex::Regex; use serde::Serialize; -use std::collections::BTreeMap; use std::convert::TryInto; +use std::fmt; use std::io::{self, SeekFrom}; use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::{Duration, Instant}; +use std::time::Instant; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncReadExt, AsyncSeekExt}; -use tokio::sync::Mutex; /* struct ReadExactWrap<'a> { @@ -51,9 +52,6 @@ impl TimedIo for File { } */ -type Offset = u64; - -const OFFSET_SIZE: usize = std::mem::size_of::(); const EPICS_EPOCH_OFFSET: u64 = 631152000 * SEC; const LOG_IO: bool = true; const STATS_IO: bool = true; @@ -70,6 +68,12 @@ pub struct StatsChannel { chn: Sender>, } +impl fmt::Debug for StatsChannel { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("StatsChannel").finish() + } +} + impl StatsChannel { pub fn new(chn: Sender>) -> Self { Self { chn } @@ -183,36 +187,6 @@ async fn read_exact(file: &mut File, buf: &mut [u8], stats: &StatsChannel) -> io res } -#[derive(Debug)] -pub struct NamedHashTableEntry { - named_hash_channel_entry_pos: u64, -} - -#[derive(Debug)] -pub struct NamedHashChannelEntry { - next: u64, - id_rtree_pos: u64, - channel_name: String, - id_txt: String, -} - -#[derive(Debug)] -pub struct IndexFileBasics { - version: u8, - name_hash_anchor_beg: u64, - name_hash_anchor_len: u64, - fa_used_list_beg: u64, - fa_used_list_end: u64, - fa_used_list_len: u64, - fa_free_list_beg: u64, - fa_free_list_end: u64, - fa_free_list_len: u64, - fa_header_prev: u64, - fa_header_next: u64, - fa_header_len: u64, - name_hash_entries: Vec, -} - pub fn name_hash(s: &str, ht_len: u32) -> u32 { let mut h = 0; for ch in s.as_bytes() { @@ -288,7 +262,7 @@ impl RingBuf { while self.len() < min { let n = self.fill(file, stats).await?; if n == 0 { - break; + return Err(Error::with_msg_no_trace(format!("fill_min can not read min {}", min))); } } return Ok(self.len() - len); @@ -311,10 +285,6 @@ fn format_hex_block(buf: &[u8], max: usize) -> String { ret } -fn readoffset(buf: &[u8], pos: usize) -> Offset { - u64::from_be_bytes(buf.as_ref()[pos..pos + OFFSET_SIZE].try_into().unwrap()) -} - fn readu64(buf: &[u8], pos: usize) -> u64 { u64::from_be_bytes(buf.as_ref()[pos..pos + 8].try_into().unwrap()) } @@ -346,528 +316,6 @@ fn read_string(buf: &[u8]) -> Result { Ok(ret) } -pub async fn read_file_basics(file: &mut File, stats: &StatsChannel) -> Result { - let mut buf = vec![0; 128]; - read_exact(file, &mut buf[0..4], stats).await?; - let version = String::from_utf8(buf[3..4].to_vec())?.parse()?; - if version == 3 { - read_exact(file, &mut buf[4..88], stats).await?; - } else if version == 2 { - read_exact(file, &mut buf[4..48], stats).await?; - } else { - panic!(); - } - //info!("\nread_file_basics\n{}", format_hex_block(&buf, 160)); - let b = &buf; - if false { - let s: String = b.iter().map(|x| format!(" {:02x}", *x)).collect(); - info!("\n\n{}", s); - } - if false { - let mut i1 = 0x58 + 0x10 * 8; - while i1 < 0x58 + 0x15 * 8 { - let s: String = b[i1..i1 + 8].iter().map(|x| format!(" {:02x}", *x)).collect(); - info!("{}", s); - i1 += 8; - } - } - if false { - info!("data:"); - let mut i1 = 0x2809; - while i1 < 0x2880 { - let s: String = b[i1..i1 + 8].iter().map(|x| format!(" {:02x}", *x)).collect(); - info!("{}", s); - i1 += 8; - } - info!("{}", String::from_utf8_lossy(&b[0x2800..0x2880])); - } - let mut ret = if version == 3 { - IndexFileBasics { - version, - name_hash_anchor_beg: readu64(b, 0x04), - name_hash_anchor_len: readu32(b, 12) as u64, - fa_used_list_len: readu64(b, 16), - fa_used_list_beg: readu64(b, 24), - fa_used_list_end: readu64(b, 32), - fa_free_list_len: readu64(b, 40), - fa_free_list_beg: readu64(b, 48), - fa_free_list_end: readu64(b, 56), - fa_header_len: readu64(b, 64), - fa_header_prev: readu64(b, 72), - fa_header_next: readu64(b, 80), - name_hash_entries: vec![], - } - } else if version == 2 { - IndexFileBasics { - version, - name_hash_anchor_beg: readu32(b, 4) as u64, - name_hash_anchor_len: readu32(b, 8) as u64, - fa_used_list_len: readu32(b, 12) as u64, - fa_used_list_beg: readu32(b, 16) as u64, - fa_used_list_end: readu32(b, 20) as u64, - fa_free_list_len: readu32(b, 24) as u64, - fa_free_list_beg: readu32(b, 28) as u64, - fa_free_list_end: readu32(b, 32) as u64, - fa_header_len: readu32(b, 36) as u64, - fa_header_prev: readu32(b, 40) as u64, - fa_header_next: readu32(b, 44) as u64, - name_hash_entries: vec![], - } - } else { - panic!(); - }; - trace!("read_file_basics w/o hashposs {:?}", ret); - { - if ret.name_hash_anchor_len > 2000 { - return Err(Error::with_msg_no_trace(format!( - "name_hash_anchor_len {}", - ret.name_hash_anchor_len - ))); - } - let u = if version == 3 { - ret.name_hash_anchor_len * 8 - } else if version == 2 { - ret.name_hash_anchor_len * 4 - } else { - panic!() - }; - buf.resize(u as usize, 0); - read_exact(file, &mut buf, stats).await?; - let b = &buf; - for i1 in 0..ret.name_hash_anchor_len { - let pos = if version == 3 { - readu64(b, i1 as usize * 8) - } else if version == 2 { - readu32(b, i1 as usize * 4) as u64 - } else { - panic!() - }; - let e = NamedHashTableEntry { - named_hash_channel_entry_pos: pos, - }; - ret.name_hash_entries.push(e); - } - } - Ok(ret) -} - -#[derive(Debug)] -pub struct RTreeNodeRecord { - ts1: Nanos, - ts2: Nanos, - // TODO should probably be better name `child or offset` and be made enum. - child_or_id: Offset, -} - -#[derive(Debug)] -pub struct RTreeNode { - pos: FilePos, - records: Vec, - is_leaf: bool, - rtree_m: usize, -} - -#[derive(Debug)] -pub struct RTreeNodeAtRecord { - node: RTreeNode, - rix: usize, -} - -impl RTreeNodeAtRecord { - pub fn rec(&self) -> &RTreeNodeRecord { - &self.node.records[self.rix] - } -} - -// TODO refactor as struct, rtree_m is a property of the tree. -pub async fn read_rtree_node( - file: &mut File, - pos: FilePos, - rtree_m: usize, - stats: &StatsChannel, -) -> Result { - const OFF1: usize = 9; - const RLEN: usize = 24; - const NANO_MAX: u32 = 999999999; - seek(file, SeekFrom::Start(pos.into()), stats).await?; - let mut rb = RingBuf::new(); - // TODO must know how much data I need at least... - rb.fill_min(file, OFF1 + rtree_m * RLEN, stats).await?; - if false { - let s = format_hex_block(rb.data(), 128); - info!("RTREE NODE:\n{}", s); - } - if rb.len() < 1 + OFFSET_SIZE { - return Err(Error::with_msg_no_trace("could not read enough")); - } - let b = rb.data(); - let is_leaf = b[0] != 0; - let parent = readu64(b, 1); - if false { - info!("is_leaf: {} parent: {}", is_leaf, parent); - } - let recs = (0..rtree_m) - .into_iter() - .filter_map(|i| { - let off2 = OFF1 + i * RLEN; - let ts1a = readu32(b, off2 + 0); - let ts1b = readu32(b, off2 + 4); - let ts2a = readu32(b, off2 + 8); - let ts2b = readu32(b, off2 + 12); - let ts1b = ts1b.min(NANO_MAX); - let ts2b = ts2b.min(NANO_MAX); - let ts1 = ts1a as u64 * SEC + ts1b as u64 + EPICS_EPOCH_OFFSET; - let ts2 = ts2a as u64 * SEC + ts2b as u64 + EPICS_EPOCH_OFFSET; - let child_or_id = readu64(b, off2 + 16); - //info!("NODE {} {} {} {} {}", ts1a, ts1b, ts2a, ts2b, child_or_id); - if child_or_id != 0 && ts2 != 0 { - let rec = RTreeNodeRecord { - ts1: Nanos { ns: ts1 }, - ts2: Nanos { ns: ts2 }, - child_or_id, - }; - Some(rec) - } else { - None - } - }) - .collect(); - let node = RTreeNode { - pos, - records: recs, - is_leaf, - rtree_m, - }; - Ok(node) -} - -pub async fn read_rtree_entrypoint( - file: &mut File, - pos: u64, - _basics: &IndexFileBasics, - stats: &StatsChannel, -) -> Result { - seek(file, SeekFrom::Start(pos), stats).await?; - let mut rb = RingBuf::new(); - // TODO should be able to indicate: - // • how much I need at most before I know that I will e.g. seek or abort. - rb.fill_min(file, OFFSET_SIZE + 4, stats).await?; - if rb.len() < OFFSET_SIZE + 4 { - return Err(Error::with_msg_no_trace("could not read enough")); - } - let b = rb.data(); - let node_offset = readu64(b, 0); - let rtree_m = readu32(b, OFFSET_SIZE); - //info!("node_offset: {} rtree_m: {}", node_offset, rtree_m); - let pos = FilePos { pos: node_offset }; - let node = read_rtree_node(file, pos, rtree_m as usize, stats).await?; - //info!("read_rtree_entrypoint READ ROOT NODE: {:?}", node); - Ok(node) -} - -#[derive(Debug)] -pub struct TreeSearchStats { - duration: Duration, - node_reads: usize, -} - -impl TreeSearchStats { - fn new(ts1: Instant, node_reads: usize) -> Self { - Self { - duration: Instant::now().duration_since(ts1), - node_reads, - } - } -} - -pub async fn search_record( - file: &mut File, - rtree_m: usize, - start_node_pos: FilePos, - beg: Nanos, - stats: &StatsChannel, -) -> Result<(Option, TreeSearchStats), Error> { - let ts1 = Instant::now(); - let mut node = read_rtree_node(file, start_node_pos, rtree_m, stats).await?; - let mut node_reads = 1; - 'outer: loop { - let nr = node.records.len(); - for (i, rec) in node.records.iter().enumerate() { - if rec.ts2.ns > beg.ns { - if node.is_leaf { - trace!("found leaf match at {} / {}", i, nr); - let ret = RTreeNodeAtRecord { node, rix: i }; - let stats = TreeSearchStats::new(ts1, node_reads); - return Ok((Some(ret), stats)); - } else { - trace!("found non-leaf match at {} / {}", i, nr); - let pos = FilePos { pos: rec.child_or_id }; - node = read_rtree_node(file, pos, rtree_m, stats).await?; - node_reads += 1; - continue 'outer; - } - } - } - { - let stats = TreeSearchStats::new(ts1, node_reads); - return Ok((None, stats)); - } - } -} - -pub async fn search_record_expand_try( - file: &mut File, - rtree_m: usize, - start_node_pos: FilePos, - beg: Nanos, - stats: &StatsChannel, -) -> Result<(Option, TreeSearchStats), Error> { - let ts1 = Instant::now(); - let mut node = read_rtree_node(file, start_node_pos, rtree_m, stats).await?; - let mut node_reads = 1; - 'outer: loop { - let nr = node.records.len(); - for (i, rec) in node.records.iter().enumerate().rev() { - if rec.ts1.ns <= beg.ns { - if node.is_leaf { - trace!("found leaf match at {} / {}", i, nr); - let ret = RTreeNodeAtRecord { node, rix: i }; - let stats = TreeSearchStats::new(ts1, node_reads); - return Ok((Some(ret), stats)); - } else { - // TODO - // We rely on channel archiver engine that there is at least one event - // in the referenced range. It should according to docs, but who knows. - trace!("found non-leaf match at {} / {}", i, nr); - let pos = FilePos { pos: rec.child_or_id }; - node = read_rtree_node(file, pos, rtree_m, stats).await?; - node_reads += 1; - continue 'outer; - } - } - } - { - let stats = TreeSearchStats::new(ts1, node_reads); - return Ok((None, stats)); - } - } -} - -pub async fn search_record_expand( - file: &mut File, - rtree_m: usize, - start_node_pos: FilePos, - beg: Nanos, - stats: &StatsChannel, -) -> Result<(Option, TreeSearchStats), Error> { - let ts1 = Instant::now(); - let res = search_record_expand_try(file, rtree_m, start_node_pos, beg, stats).await?; - match res { - (Some(res), stats) => { - let ts2 = Instant::now(); - info!("search_record_expand took {:?}", ts2.duration_since(ts1)); - Ok((Some(res), stats)) - } - _ => { - let res = search_record(file, rtree_m, start_node_pos, beg, stats).await; - let ts2 = Instant::now(); - info!("search_record_expand took {:?}", ts2.duration_since(ts1)); - res - } - } -} - -#[derive(Debug)] -pub struct ChannelInfoBasics { - channel_name: String, - rtree_m: usize, - rtree_start_pos: FilePos, -} - -pub async fn read_channel( - index_file: &mut File, - channel_name: &str, - stats: &StatsChannel, -) -> Result, Error> { - let basics = read_file_basics(index_file, stats).await?; - let hver2 = HeaderVersion2; - let hver3 = HeaderVersion3; - let hver: &dyn HeaderVersion = if basics.version == 3 { - &hver3 - } else if basics.version == 2 { - &hver2 - } else { - panic!() - }; - let chn_hash = name_hash(channel_name, basics.name_hash_anchor_len as u32); - let epos = &basics.name_hash_entries[chn_hash as usize]; - let mut entries = vec![]; - let mut rb = RingBuf::new(); - let mut pos = epos.named_hash_channel_entry_pos; - loop { - rb.reset(); - seek(index_file, SeekFrom::Start(pos), stats).await?; - let fill_min = if hver.offset_size() == 8 { 20 } else { 12 }; - rb.fill_min(index_file, fill_min, stats).await?; - if rb.len() < fill_min { - warn!("not enough data to continue reading channel list from name hash list"); - break; - } - let buf = rb.data(); - let e = parse_name_hash_channel_entry(buf, hver)?; - let next = e.next; - entries.push(e); - if next == 0 { - break; - } else { - pos = next; - } - } - for e in &entries { - if e.channel_name == channel_name { - let ep = read_rtree_entrypoint(index_file, e.id_rtree_pos, &basics, stats).await?; - let ret = ChannelInfoBasics { - channel_name: channel_name.into(), - rtree_m: ep.rtree_m, - rtree_start_pos: ep.pos, - }; - return Ok(Some(ret)); - } - } - Ok(None) -} - -pub trait HeaderVersion: Send + Sync { - fn version(&self) -> u8; - fn read_offset(&self, buf: &[u8], pos: usize) -> u64; - fn offset_size(&self) -> usize; -} - -struct HeaderVersion2; - -impl HeaderVersion for HeaderVersion2 { - fn version(&self) -> u8 { - 2 - } - - fn read_offset(&self, buf: &[u8], pos: usize) -> u64 { - readu32(buf, pos) as u64 - } - - fn offset_size(&self) -> usize { - 4 - } -} - -struct HeaderVersion3; - -impl HeaderVersion for HeaderVersion3 { - fn version(&self) -> u8 { - 3 - } - - fn read_offset(&self, buf: &[u8], pos: usize) -> u64 { - readu64(buf, pos) - } - - fn offset_size(&self) -> usize { - 8 - } -} - -fn parse_name_hash_channel_entry(buf: &[u8], hver: &dyn HeaderVersion) -> Result { - let mut p1 = 0; - let next = hver.read_offset(&buf, p1); - p1 += hver.offset_size(); - let id = hver.read_offset(&buf, p1); - p1 += hver.offset_size(); - let name_len = readu16(&buf, p1); - p1 += 2; - let id_txt_len = readu16(&buf, p1); - p1 += 2; - if next > 1024 * 1024 * 1024 * 1024 || id > 1024 * 1024 * 1024 * 1024 || name_len > 128 || id_txt_len > 128 { - error!( - "something bad: parse_name_hash_channel_entry next {} id {} name_len {} id_txt_len {}", - next, id, name_len, id_txt_len - ); - return Err(Error::with_msg_no_trace("bad hash table entry")); - } - let n1 = name_len as usize; - let n2 = id_txt_len as usize; - let channel_name_found = String::from_utf8(buf[p1..p1 + n1].to_vec())?; - p1 += n1; - let id_txt = String::from_utf8(buf[p1..p1 + n2].to_vec())?; - p1 += n2; - let _ = p1; - let e = NamedHashChannelEntry { - next, - id_rtree_pos: id, - channel_name: channel_name_found, - id_txt, - }; - Ok(e) -} - -async fn channel_list_from_index_name_hash_list( - file: &mut File, - pos: FilePos, - hver: &dyn HeaderVersion, - stats: &StatsChannel, -) -> Result, Error> { - let mut pos = pos; - let mut ret = vec![]; - let mut rb = RingBuf::new(); - loop { - rb.reset(); - seek(file, SeekFrom::Start(pos.pos), stats).await?; - let fill_min = if hver.offset_size() == 8 { 20 } else { 12 }; - rb.fill_min(file, fill_min, stats).await?; - if rb.len() < fill_min { - warn!("not enough data to continue reading channel list from name hash list"); - break; - } - let e = parse_name_hash_channel_entry(rb.data(), hver)?; - let next = e.next; - ret.push(e); - if next == 0 { - break; - } else { - pos.pos = next; - } - } - Ok(ret) -} - -pub async fn channel_list(index_path: PathBuf, stats: &StatsChannel) -> Result, Error> { - let mut ret = vec![]; - let file = &mut open_read(index_path.clone(), stats).await?; - let basics = read_file_basics(file, stats).await?; - let hver2 = HeaderVersion2; - let hver3 = HeaderVersion3; - let hver: &dyn HeaderVersion = if basics.version == 2 { - &hver2 - } else if basics.version == 3 { - &hver3 - } else { - return Err(Error::with_msg_no_trace(format!( - "unexpected version {}", - basics.version - ))); - }; - for (_i, name_hash_entry) in basics.name_hash_entries.iter().enumerate() { - if name_hash_entry.named_hash_channel_entry_pos != 0 { - let pos = FilePos { - pos: name_hash_entry.named_hash_channel_entry_pos, - }; - let list = channel_list_from_index_name_hash_list(file, pos, hver, stats).await?; - for e in list { - ret.push(e.channel_name); - } - } - } - Ok(ret) -} - #[allow(dead_code)] async fn datarange_stream_fill(_channel_name: &str, _tx: Sender) { // Search the first relevant leaf node. @@ -886,235 +334,6 @@ pub fn datarange_stream(_channel_name: &str) -> Result, Erro Ok(rx) } -#[derive(Debug)] -pub struct Datablock { - next: Offset, - data_header_pos: Offset, - fname: String, -} - -impl Datablock { - fn file_name(&self) -> &str { - &self.fname - } - - fn data_header_pos(&self) -> DataHeaderPos { - DataHeaderPos(self.data_header_pos) - } -} - -async fn read_index_datablockref(file: &mut File, pos: FilePos, stats: &StatsChannel) -> Result { - seek(file, SeekFrom::Start(pos.pos), stats).await?; - let mut rb = RingBuf::new(); - rb.fill_min(file, 18, stats).await?; - let buf = rb.data(); - let next = readoffset(buf, 0); - let data = readoffset(buf, 8); - let len = readu16(buf, 16) as usize; - rb.fill_min(file, 18 + len, stats).await?; - let buf = rb.data(); - let fname = String::from_utf8(buf[18..18 + len].to_vec())?; - let ret = Datablock { - next, - data_header_pos: data, - fname, - }; - Ok(ret) -} - -#[derive(Debug)] -enum DbrType { - DbrString = 0, - DbrInt = 1, - DbrStsFloat = 9, - DbrTimeDouble = 20, -} - -impl DbrType { - fn from_u16(k: u16) -> Result { - use DbrType::*; - let res = match k { - 0 => DbrString, - 1 => DbrInt, - 9 => DbrStsFloat, - 20 => DbrTimeDouble, - _ => { - let msg = format!("not a valid/supported dbr type: {}", k); - return Err(Error::with_msg_no_trace(msg)); - } - }; - Ok(res) - } - - #[allow(dead_code)] - fn byte_len(&self) -> usize { - use DbrType::*; - match self { - DbrString => 0, - DbrInt => 4, - DbrStsFloat => 1, - DbrTimeDouble => 16, - } - } -} - -#[derive(Debug)] -pub struct DatafileHeader { - pos: DataHeaderPos, - dir_offset: u32, - // Should be absolute file position of the next data header - // together with `fname_next`. - // But unfortunately not always set? - next_offset: u32, - prev_offset: u32, - curr_offset: u32, - num_samples: u32, - ctrl_info_offset: u32, - buf_size: u32, - buf_free: u32, - dbr_type: DbrType, - dbr_count: usize, - period: f64, - ts_beg: Nanos, - ts_end: Nanos, - ts_next_file: Nanos, - fname_next: String, - fname_prev: String, -} - -const DATA_HEADER_LEN_ON_DISK: usize = 72 + 40 + 40; - -async fn read_datafile_header( - file: &mut File, - pos: DataHeaderPos, - stats: &StatsChannel, -) -> Result { - seek(file, SeekFrom::Start(pos.0), stats).await?; - let mut rb = RingBuf::new(); - rb.fill_min(file, DATA_HEADER_LEN_ON_DISK, stats).await?; - let buf = rb.data(); - let dir_offset = readu32(buf, 0); - let next_offset = readu32(buf, 4); - let prev_offset = readu32(buf, 8); - let curr_offset = readu32(buf, 12); - let num_samples = readu32(buf, 16); - let ctrl_info_offset = readu32(buf, 20); - let buf_size = readu32(buf, 24); - let buf_free = readu32(buf, 28); - let dbr_type = DbrType::from_u16(readu16(buf, 32))?; - let dbr_count = readu16(buf, 34); - // 4 bytes padding. - let period = readf64(buf, 40); - let ts1a = readu32(buf, 48); - let ts1b = readu32(buf, 52); - let ts2a = readu32(buf, 56); - let ts2b = readu32(buf, 60); - let ts3a = readu32(buf, 64); - let ts3b = readu32(buf, 68); - let ts_beg = if ts1a != 0 || ts1b != 0 { - ts1a as u64 * SEC + ts1b as u64 + EPICS_EPOCH_OFFSET - } else { - 0 - }; - let ts_end = if ts3a != 0 || ts3b != 0 { - ts3a as u64 * SEC + ts3b as u64 + EPICS_EPOCH_OFFSET - } else { - 0 - }; - let ts_next_file = if ts2a != 0 || ts2b != 0 { - ts2a as u64 * SEC + ts2b as u64 + EPICS_EPOCH_OFFSET - } else { - 0 - }; - let fname_prev = read_string(&buf[72..112])?; - let fname_next = read_string(&buf[112..152])?; - let ret = DatafileHeader { - pos, - dir_offset, - next_offset, - prev_offset, - curr_offset, - num_samples, - ctrl_info_offset, - buf_size, - buf_free, - dbr_type, - dbr_count: dbr_count as usize, - period, - ts_beg: Nanos { ns: ts_beg }, - ts_end: Nanos { ns: ts_end }, - ts_next_file: Nanos { ns: ts_next_file }, - fname_next, - fname_prev, - }; - Ok(ret) -} - -async fn read_data_1( - file: &mut File, - datafile_header: &DatafileHeader, - range: NanoRange, - _expand: bool, - stats: &StatsChannel, -) -> Result { - // TODO handle expand mode - let dhpos = datafile_header.pos.0 + DATA_HEADER_LEN_ON_DISK as u64; - seek(file, SeekFrom::Start(dhpos), stats).await?; - let res = match &datafile_header.dbr_type { - DbrType::DbrTimeDouble => { - if datafile_header.dbr_count == 1 { - trace!("~~~~~~~~~~~~~~~~~~~~~ read scalar DbrTimeDouble"); - let mut evs = EventValues { - tss: vec![], - values: vec![], - }; - let n1 = datafile_header.num_samples as usize; - //let n2 = datafile_header.dbr_type.byte_len(); - let n2 = 2 + 2 + 4 + 4 + (4) + 8; - let n3 = n1 * n2; - let mut buf = vec![0; n3]; - read_exact(file, &mut buf, stats).await?; - let mut p1 = 0; - let mut ntot = 0; - while p1 < n3 - n2 { - let _status = u16::from_be_bytes(buf[p1..p1 + 2].try_into().unwrap()); - p1 += 2; - let _severity = u16::from_be_bytes(buf[p1..p1 + 2].try_into().unwrap()); - p1 += 2; - let ts1a = u32::from_be_bytes(buf[p1..p1 + 4].try_into().unwrap()); - p1 += 4; - let ts1b = u32::from_be_bytes(buf[p1..p1 + 4].try_into().unwrap()); - p1 += 4; - let ts1 = ts1a as u64 * SEC + ts1b as u64 + EPICS_EPOCH_OFFSET; - p1 += 4; - let value = f64::from_be_bytes(buf[p1..p1 + 8].try_into().unwrap()); - p1 += 8; - ntot += 1; - if ts1 >= range.beg && ts1 < range.end { - evs.tss.push(ts1); - evs.values.push(value); - } - } - info!("parsed block with {} / {} events", ntot, evs.tss.len()); - let evs = ScalarPlainEvents::Double(evs); - let plain = PlainEvents::Scalar(evs); - let item = EventsItem::Plain(plain); - item - } else { - let msg = format!("dbr_count {:?} not yet supported", datafile_header.dbr_count); - error!("{}", msg); - return Err(Error::with_msg_no_trace(msg)); - } - } - _ => { - let msg = format!("Type {:?} not yet supported", datafile_header.dbr_type); - error!("{}", msg); - return Err(Error::with_msg_no_trace(msg)); - } - }; - Ok(res) -} - #[derive(Debug, Serialize)] pub struct ListChannelItem { name: String, @@ -1251,207 +470,15 @@ pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> R } } -#[derive(Debug)] -enum RetClass { - Long, - Medium, - Short, - #[allow(unused)] - PostMortem, -} - -#[derive(Debug)] -enum IndexCat { - Machine { rc: RetClass }, - Beamline { rc: RetClass, name: String }, -} - -#[derive(Debug)] -struct IndexFile { - path: PathBuf, - cat: IndexCat, -} - -// Try to make sense of historical conventions how the epics channel archiver engines are configured. -fn categorize_index_files(list: &Vec) -> Result, Error> { - let re_m = Regex::new(r"/archive_(ST|MT|LT)/index").unwrap(); - let re_b = Regex::new(r"/archive_(X([0-9]+)[^_]*)_(SH|LO)/index").unwrap(); - let mut ret = vec![]; - for p in list { - match re_m.captures(p) { - Some(cap) => { - let rc = cap.get(1).unwrap().as_str(); - let rc = match rc { - "ST" => Some(RetClass::Short), - "MT" => Some(RetClass::Medium), - "LT" => Some(RetClass::Long), - _ => { - warn!("categorize_index_files no idea about RC for {}", p); - None - } - }; - if let Some(rc) = rc { - let f = IndexFile { - path: p.into(), - cat: IndexCat::Machine { rc }, - }; - ret.push(f); - } - } - None => match re_b.captures(p) { - Some(cap) => { - let name = cap.get(1).unwrap().as_str(); - let rc = cap.get(3).unwrap().as_str(); - let rc = match rc { - "SH" => Some(RetClass::Short), - "LO" => Some(RetClass::Long), - _ => { - warn!("categorize_index_files no idea about RC for {}", p); - None - } - }; - if let Some(rc) = rc { - let f = IndexFile { - path: p.into(), - cat: IndexCat::Beamline { name: name.into(), rc }, - }; - ret.push(f); - } - } - None => { - warn!("categorize_index_files no idea at all about {}", p); - } - }, - } - } - let is_machine = { - let mut k = false; - for x in &ret { - if let IndexCat::Machine { .. } = &x.cat { - k = true; - break; - } - } - k - }; - // TODO by default, filter post-mortem. - let is_beamline = !is_machine; - if is_beamline { - let mut ret: Vec<_> = ret - .into_iter() - .filter_map(|k| { - if let IndexCat::Machine { rc, .. } = &k.cat { - let prio = match rc { - &RetClass::Short => 4, - &RetClass::Medium => 6, - &RetClass::Long => 8, - &RetClass::PostMortem => 0, - }; - Some((k, prio)) - } else { - None - } - }) - .collect(); - ret.sort_by_key(|x| x.1); - let ret = ret.into_iter().map(|k| k.0).collect(); - Ok(ret) - } else if is_machine { - let mut ret: Vec<_> = ret - .into_iter() - .filter_map(|k| { - if let IndexCat::Machine { rc, .. } = &k.cat { - let prio = match rc { - &RetClass::Short => 4, - &RetClass::Medium => 6, - &RetClass::Long => 8, - &RetClass::PostMortem => 0, - }; - Some((k, prio)) - } else { - None - } - }) - .collect(); - ret.sort_by_key(|x| x.1); - let ret = ret.into_iter().map(|k| k.0).collect(); - Ok(ret) - } else { - err::todoval() - } -} - -static INDEX_JSON: Mutex>>> = Mutex::const_new(None); - -pub async fn index_files_index_ref + Send>( - key: &str, - index_files_index_path: P, - stats: &StatsChannel, -) -> Result>, Error> { - let mut g = INDEX_JSON.lock().await; - match &*g { - Some(j) => Ok(j.get(key).map(|x| x.clone())), - None => { - let timed1 = Timed::new("slurp_index_json"); - let index_files_index_path = index_files_index_path.into(); - let index_files_index = { - let timed1 = Timed::new("slurp_index_bytes"); - let mut index_files_index = open_read(index_files_index_path, stats).await?; - let mut buf = vec![0; 1024 * 1024 * 50]; - let mut ntot = 0; - loop { - let n = read(&mut index_files_index, &mut buf[ntot..], stats).await?; - if n == 0 { - break; - } - ntot += n; - } - buf.truncate(ntot); - drop(timed1); - serde_json::from_slice::>>(&buf)? - }; - drop(timed1); - let ret = index_files_index.get(key).map(|x| x.clone()); - *g = Some(index_files_index); - Ok(ret) - } - } -} - -pub async fn index_file_path_list( - channel: Channel, - index_files_index_path: PathBuf, - stats: &StatsChannel, -) -> Result, Error> { - let timed1 = Timed::new("categorize index files"); - let index_paths = index_files_index_ref(channel.name(), &index_files_index_path, stats) - .await? - .ok_or(Error::with_msg_no_trace("can not find channel"))?; - let list = categorize_index_files(&index_paths)?; - info!("GOT CATEGORIZED:\n{:?}", list); - let ret = list.into_iter().map(|k| k.path).collect(); - drop(timed1); - Ok(ret) -} - #[cfg(test)] mod test { - // TODO move RangeFilter to a different crate (items?) - // because the `disk` crate should become the specific sf-databuffer reader engine. - - //use disk::rangefilter::RangeFilter; - //use disk::{eventblobs::EventChunkerMultifile, eventchunker::EventChunkerConf}; - - use super::search_record; - use crate::archeng::{ - open_read, read_channel, read_data_1, read_datafile_header, read_file_basics, read_index_datablockref, - StatsChannel, EPICS_EPOCH_OFFSET, - }; + use crate::archeng::datablock::{read_data_1, read_datafile_header}; + use crate::archeng::indextree::{read_channel, read_datablockref, search_record}; + use crate::archeng::{open_read, StatsChannel, EPICS_EPOCH_OFFSET}; use err::Error; + use netpod::log::*; use netpod::timeunits::*; - use netpod::FilePos; - use netpod::Nanos; - use netpod::{log::*, NanoRange}; + use netpod::{FilePos, NanoRange, Nanos}; use std::path::PathBuf; /* @@ -1459,88 +486,13 @@ mod test { */ const CHN_0_MASTER_INDEX: &str = "/data/daqbuffer-testdata/sls/gfa03/bl_arch/archive_X05DA_SH/index"; - #[test] - fn read_file_basic_info() -> Result<(), Error> { - let fut = async { - let stats = &StatsChannel::dummy(); - let mut f1 = open_read(CHN_0_MASTER_INDEX.into(), stats).await?; - let res = read_file_basics(&mut f1, stats).await?; - assert_eq!(res.version, 3); - assert_eq!(res.name_hash_anchor_beg, 88); - assert_eq!(res.name_hash_anchor_len, 1009); - // TODO makes no sense: - assert_eq!(res.fa_used_list_beg, 2611131); - assert_eq!(res.fa_used_list_end, 64); - assert_eq!(res.fa_used_list_len, 2136670); - assert_eq!(res.fa_header_next, 8160); - assert_eq!(res.fa_header_prev, 0); - assert_eq!(res.fa_header_len, 8072); - Ok(()) - }; - Ok(taskrun::run(fut).unwrap()) - } - - #[test] - fn read_for_channel() -> Result<(), Error> { - let fut = async { - let stats = &StatsChannel::dummy(); - let mut index_file = open_read(CHN_0_MASTER_INDEX.into(), stats).await?; - let channel_name = "X05DA-FE-WI1:TC1"; - let res = read_channel(&mut index_file, channel_name, stats).await?; - assert_eq!(res.is_some(), true); - let res = res.unwrap(); - assert_eq!(res.channel_name, channel_name); - assert_eq!(res.rtree_m, 50); - assert_eq!(res.rtree_start_pos.pos, 329750); - Ok(()) - }; - Ok(taskrun::run(fut).unwrap()) - } - - #[test] - fn search_record_middle() -> Result<(), Error> { - /* - These times are still without EPICS_EPOCH_OFFSET. - RTreeNodeRecord { ts1: 969729779686636130, ts2: 970351442331056677, child_or_id: 130731 }, - RTreeNodeRecord { ts1: 970351499684884156, ts2: 970417919634086480, child_or_id: 185074 }, - RTreeNodeRecord { ts1: 970417979635219603, ts2: 970429859806669835, child_or_id: 185015 }, - */ - let fut = async { - let stats = &StatsChannel::dummy(); - let index_path: PathBuf = CHN_0_MASTER_INDEX.into(); - let mut index_file = open_read(index_path.clone(), stats).await?; - let channel_name = "X05DA-FE-WI1:TC1"; - const T0: u64 = 970351442331056677 + 1 + EPICS_EPOCH_OFFSET; - let beg = Nanos { ns: T0 }; - let res = read_channel(&mut index_file, channel_name, stats).await?; - let cib = res.unwrap(); - let (res, _stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg, stats).await?; - assert_eq!(res.is_some(), true); - let res = res.unwrap(); - assert_eq!(res.node.is_leaf, true); - assert_eq!(res.node.pos.pos, 8216); - assert_eq!(res.rix, 17); - let rec = &res.node.records[res.rix]; - assert_eq!(rec.ts1.ns, 970351499684884156 + EPICS_EPOCH_OFFSET); - assert_eq!(rec.ts2.ns, 970417919634086480 + EPICS_EPOCH_OFFSET); - assert_eq!(rec.child_or_id, 185074); - let pos = FilePos { pos: rec.child_or_id }; - let datablock = read_index_datablockref(&mut index_file, pos, stats).await?; - assert_eq!(datablock.data_header_pos, 52787); - assert_eq!(datablock.fname, "20201001/20201001"); - // The actual datafile for that time was not retained any longer. - // But the index still points to that. - Ok(()) - }; - Ok(taskrun::run(fut).unwrap()) - } - #[test] fn search_record_data() -> Result<(), Error> { let fut = async { let stats = &StatsChannel::dummy(); let index_path: PathBuf = CHN_0_MASTER_INDEX.into(); - let mut index_file = open_read(index_path.clone(), stats).await?; + let index_file = open_read(index_path.clone(), stats).await?; + let mut file2 = open_read(index_path.clone(), stats).await?; let channel_name = "X05DA-FE-WI1:TC1"; const T0: u64 = 1002000000 * SEC + EPICS_EPOCH_OFFSET; let beg = Nanos { ns: T0 }; @@ -1548,9 +500,9 @@ mod test { beg: beg.ns, end: beg.ns + 20 * SEC, }; - let res = read_channel(&mut index_file, channel_name, stats).await?; + let res = read_channel(index_path.clone(), index_file, channel_name, stats).await?; let cib = res.unwrap(); - let (res, _stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg, stats).await?; + let (res, _stats) = search_record(&mut file2, cib.rtree_m, cib.rtree_start_pos, beg, stats).await?; assert_eq!(res.is_some(), true); let res = res.unwrap(); assert_eq!(res.node.is_leaf, true); @@ -1561,7 +513,7 @@ mod test { assert_eq!(rec.ts2.ns, 1002009299596362122 + EPICS_EPOCH_OFFSET); assert_eq!(rec.child_or_id, 2501903); let pos = FilePos { pos: rec.child_or_id }; - let datablock = read_index_datablockref(&mut index_file, pos, stats).await?; + let datablock = read_datablockref(&mut file2, pos, cib.hver(), stats).await?; assert_eq!(datablock.data_header_pos().0, 9311367); assert_eq!(datablock.file_name(), "20211001/20211001"); let data_path = index_path.parent().unwrap().join(datablock.file_name()); @@ -1573,65 +525,4 @@ mod test { }; Ok(taskrun::run(fut).unwrap()) } - - // Note: this tests only the index tree, but it does not look for any actual event in some file. - #[test] - fn search_record_at_beg() -> Result<(), Error> { - let fut = async { - let stats = &StatsChannel::dummy(); - let mut index_file = open_read(CHN_0_MASTER_INDEX.into(), stats).await?; - let channel_name = "X05DA-FE-WI1:TC1"; - const T0: u64 = 965081099942616289 + EPICS_EPOCH_OFFSET; - let beg = Nanos { ns: T0 }; - let res = read_channel(&mut index_file, channel_name, stats).await?; - let cib = res.unwrap(); - let (res, _stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg, stats).await?; - assert_eq!(res.is_some(), true); - let res = res.unwrap(); - assert_eq!(res.node.is_leaf, true); - assert_eq!(res.node.pos.pos, 8216); - assert_eq!(res.rix, 0); - Ok(()) - }; - Ok(taskrun::run(fut).unwrap()) - } - - // Note: this tests only the index tree, but it does not look for any actual event in some file. - #[test] - fn search_record_at_end() -> Result<(), Error> { - let fut = async { - let stats = &StatsChannel::dummy(); - let mut index_file = open_read(CHN_0_MASTER_INDEX.into(), stats).await?; - let channel_name = "X05DA-FE-WI1:TC1"; - const T0: u64 = 1002441959876114632 - 1 + EPICS_EPOCH_OFFSET; - let beg = Nanos { ns: T0 }; - let res = read_channel(&mut index_file, channel_name, stats).await?; - let cib = res.unwrap(); - let (res, _stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg, stats).await?; - assert_eq!(res.is_some(), true); - let res = res.unwrap(); - assert_eq!(res.node.pos.pos, 1861178); - assert_eq!(res.rix, 46); - Ok(()) - }; - Ok(taskrun::run(fut).unwrap()) - } - - // Note: this tests only the index tree, but it does not look for any actual event in some file. - #[test] - fn search_record_beyond_end() -> Result<(), Error> { - let fut = async { - let stats = &StatsChannel::dummy(); - let mut index_file = open_read(CHN_0_MASTER_INDEX.into(), stats).await?; - let channel_name = "X05DA-FE-WI1:TC1"; - const T0: u64 = 1002441959876114632 - 0 + EPICS_EPOCH_OFFSET; - let beg = Nanos { ns: T0 }; - let res = read_channel(&mut index_file, channel_name, stats).await?; - let cib = res.unwrap(); - let (res, _stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg, stats).await?; - assert_eq!(res.is_none(), true); - Ok(()) - }; - Ok(taskrun::run(fut).unwrap()) - } } diff --git a/archapp/src/archeng/datablock.rs b/archapp/src/archeng/datablock.rs new file mode 100644 index 0000000..d58808a --- /dev/null +++ b/archapp/src/archeng/datablock.rs @@ -0,0 +1,208 @@ +use crate::archeng::{ + read_exact, read_string, readf64, readu16, readu32, seek, RingBuf, StatsChannel, EPICS_EPOCH_OFFSET, +}; +use crate::eventsitem::EventsItem; +use crate::plainevents::{PlainEvents, ScalarPlainEvents}; +use err::Error; +use items::eventvalues::EventValues; +use netpod::log::*; +use netpod::timeunits::SEC; +use netpod::{NanoRange, Nanos}; +use std::convert::TryInto; +use std::io::SeekFrom; +use tokio::fs::File; + +use super::indextree::DataheaderPos; + +#[derive(Debug)] +enum DbrType { + DbrString = 0, + DbrInt = 1, + DbrStsFloat = 9, + DbrTimeDouble = 20, +} + +impl DbrType { + fn from_u16(k: u16) -> Result { + use DbrType::*; + let res = match k { + 0 => DbrString, + 1 => DbrInt, + 9 => DbrStsFloat, + 20 => DbrTimeDouble, + _ => { + let msg = format!("not a valid/supported dbr type: {}", k); + return Err(Error::with_msg_no_trace(msg)); + } + }; + Ok(res) + } + + #[allow(dead_code)] + fn byte_len(&self) -> usize { + use DbrType::*; + match self { + DbrString => 0, + DbrInt => 4, + DbrStsFloat => 1, + DbrTimeDouble => 16, + } + } +} + +#[derive(Debug)] +pub struct DatafileHeader { + pos: DataheaderPos, + dir_offset: u32, + // Should be absolute file position of the next data header + // together with `fname_next`. + // But unfortunately not always set? + next_offset: u32, + prev_offset: u32, + curr_offset: u32, + num_samples: u32, + ctrl_info_offset: u32, + buf_size: u32, + buf_free: u32, + dbr_type: DbrType, + dbr_count: usize, + period: f64, + ts_beg: Nanos, + ts_end: Nanos, + ts_next_file: Nanos, + fname_next: String, + fname_prev: String, +} + +const DATA_HEADER_LEN_ON_DISK: usize = 72 + 40 + 40; + +pub async fn read_datafile_header( + file: &mut File, + pos: DataheaderPos, + stats: &StatsChannel, +) -> Result { + seek(file, SeekFrom::Start(pos.0), stats).await?; + let mut rb = RingBuf::new(); + rb.fill_min(file, DATA_HEADER_LEN_ON_DISK, stats).await?; + let buf = rb.data(); + let dir_offset = readu32(buf, 0); + let next_offset = readu32(buf, 4); + let prev_offset = readu32(buf, 8); + let curr_offset = readu32(buf, 12); + let num_samples = readu32(buf, 16); + let ctrl_info_offset = readu32(buf, 20); + let buf_size = readu32(buf, 24); + let buf_free = readu32(buf, 28); + let dbr_type = DbrType::from_u16(readu16(buf, 32))?; + let dbr_count = readu16(buf, 34); + // 4 bytes padding. + let period = readf64(buf, 40); + let ts1a = readu32(buf, 48); + let ts1b = readu32(buf, 52); + let ts2a = readu32(buf, 56); + let ts2b = readu32(buf, 60); + let ts3a = readu32(buf, 64); + let ts3b = readu32(buf, 68); + let ts_beg = if ts1a != 0 || ts1b != 0 { + ts1a as u64 * SEC + ts1b as u64 + EPICS_EPOCH_OFFSET + } else { + 0 + }; + let ts_end = if ts3a != 0 || ts3b != 0 { + ts3a as u64 * SEC + ts3b as u64 + EPICS_EPOCH_OFFSET + } else { + 0 + }; + let ts_next_file = if ts2a != 0 || ts2b != 0 { + ts2a as u64 * SEC + ts2b as u64 + EPICS_EPOCH_OFFSET + } else { + 0 + }; + let fname_prev = read_string(&buf[72..112])?; + let fname_next = read_string(&buf[112..152])?; + let ret = DatafileHeader { + pos, + dir_offset, + next_offset, + prev_offset, + curr_offset, + num_samples, + ctrl_info_offset, + buf_size, + buf_free, + dbr_type, + dbr_count: dbr_count as usize, + period, + ts_beg: Nanos { ns: ts_beg }, + ts_end: Nanos { ns: ts_end }, + ts_next_file: Nanos { ns: ts_next_file }, + fname_next, + fname_prev, + }; + Ok(ret) +} + +pub async fn read_data_1( + file: &mut File, + datafile_header: &DatafileHeader, + range: NanoRange, + _expand: bool, + stats: &StatsChannel, +) -> Result { + // TODO handle expand mode + let dhpos = datafile_header.pos.0 + DATA_HEADER_LEN_ON_DISK as u64; + seek(file, SeekFrom::Start(dhpos), stats).await?; + let res = match &datafile_header.dbr_type { + DbrType::DbrTimeDouble => { + if datafile_header.dbr_count == 1 { + trace!("~~~~~~~~~~~~~~~~~~~~~ read scalar DbrTimeDouble"); + let mut evs = EventValues { + tss: vec![], + values: vec![], + }; + let n1 = datafile_header.num_samples as usize; + //let n2 = datafile_header.dbr_type.byte_len(); + let n2 = 2 + 2 + 4 + 4 + (4) + 8; + let n3 = n1 * n2; + let mut buf = vec![0; n3]; + read_exact(file, &mut buf, stats).await?; + let mut p1 = 0; + let mut ntot = 0; + while p1 < n3 - n2 { + let _status = u16::from_be_bytes(buf[p1..p1 + 2].try_into().unwrap()); + p1 += 2; + let _severity = u16::from_be_bytes(buf[p1..p1 + 2].try_into().unwrap()); + p1 += 2; + let ts1a = u32::from_be_bytes(buf[p1..p1 + 4].try_into().unwrap()); + p1 += 4; + let ts1b = u32::from_be_bytes(buf[p1..p1 + 4].try_into().unwrap()); + p1 += 4; + let ts1 = ts1a as u64 * SEC + ts1b as u64 + EPICS_EPOCH_OFFSET; + p1 += 4; + let value = f64::from_be_bytes(buf[p1..p1 + 8].try_into().unwrap()); + p1 += 8; + ntot += 1; + if ts1 >= range.beg && ts1 < range.end { + evs.tss.push(ts1); + evs.values.push(value); + } + } + info!("parsed block with {} / {} events", ntot, evs.tss.len()); + let evs = ScalarPlainEvents::Double(evs); + let plain = PlainEvents::Scalar(evs); + let item = EventsItem::Plain(plain); + item + } else { + let msg = format!("dbr_count {:?} not yet supported", datafile_header.dbr_count); + error!("{}", msg); + return Err(Error::with_msg_no_trace(msg)); + } + } + _ => { + let msg = format!("Type {:?} not yet supported", datafile_header.dbr_type); + error!("{}", msg); + return Err(Error::with_msg_no_trace(msg)); + } + }; + Ok(res) +} diff --git a/archapp/src/archeng/datablockstream.rs b/archapp/src/archeng/datablockstream.rs index 797d268..1343770 100644 --- a/archapp/src/archeng/datablockstream.rs +++ b/archapp/src/archeng/datablockstream.rs @@ -1,7 +1,7 @@ -use crate::archeng::{ - index_file_path_list, open_read, read_channel, read_data_1, read_datafile_header, read_index_datablockref, - search_record, search_record_expand, StatsChannel, -}; +use crate::archeng::datablock::{read_data_1, read_datafile_header}; +use crate::archeng::indexfiles::index_file_path_list; +use crate::archeng::indextree::{read_channel, read_datablockref, search_record, search_record_expand, DataheaderPos}; +use crate::archeng::{open_read, StatsChannel}; use crate::eventsitem::EventsItem; use crate::storagemerge::StorageMerge; use crate::timed::Timed; @@ -10,8 +10,9 @@ use err::Error; use futures_core::{Future, Stream}; use futures_util::{FutureExt, StreamExt}; use items::{inspect_timestamps, RangeCompletableItem, Sitemty, StreamItem, WithLen}; -use netpod::{log::*, DataHeaderPos, FilePos, Nanos}; +use netpod::log::*; use netpod::{Channel, NanoRange}; +use netpod::{FilePos, Nanos}; use std::collections::VecDeque; use std::path::PathBuf; use std::pin::Pin; @@ -63,129 +64,106 @@ async fn datablock_stream_inner_single_index( let mut events_tot = 0; let stats = &StatsChannel::new(tx.clone()); debug!("try to open index file: {:?}", index_path); - let res = open_read(index_path.clone(), stats).await; - debug!("opened index file: {:?} {:?}", index_path, res); - match res { - Ok(mut index_file) => { - if let Some(basics) = read_channel(&mut index_file, channel.name(), stats).await? { - let beg = Nanos { ns: range.beg }; - let mut expand_beg = expand; - let mut index_ts_max = 0; - let mut search_ts = beg.clone(); - let mut last_data_file_path = PathBuf::new(); - let mut last_data_file_pos = DataHeaderPos(0); - loop { - let timed_search = Timed::new("search next record"); - let (res, _stats) = if expand_beg { - // TODO even though this is an entry in the index, it may reference - // non-existent blocks. - // Therefore, lower expand_beg flag at some later stage only if we've really - // found at least one event in the block. - expand_beg = false; - search_record_expand( - &mut index_file, - basics.rtree_m, - basics.rtree_start_pos, - search_ts, - stats, - ) - .await? - } else { - search_record( - &mut index_file, - basics.rtree_m, - basics.rtree_start_pos, - search_ts, - stats, - ) - .await? - }; - drop(timed_search); - if let Some(nrec) = res { - let rec = nrec.rec(); - trace!("found record: {:?}", rec); - let pos = FilePos { pos: rec.child_or_id }; - // TODO rename Datablock? → IndexNodeDatablock - trace!("READ Datablock FROM {:?}\n", pos); - let datablock = read_index_datablockref(&mut index_file, pos, stats).await?; - trace!("Datablock: {:?}\n", datablock); - let data_path = index_path.parent().unwrap().join(datablock.file_name()); - if data_path == last_data_file_path && datablock.data_header_pos() == last_data_file_pos { - debug!("skipping because it is the same block"); - } else { - trace!("try to open data_path: {:?}", data_path); - match open_read(data_path.clone(), stats).await { - Ok(mut data_file) => { - let datafile_header = - read_datafile_header(&mut data_file, datablock.data_header_pos(), stats) - .await?; - trace!("datafile_header -------------- HEADER\n{:?}", datafile_header); - let events = - read_data_1(&mut data_file, &datafile_header, range.clone(), expand_beg, stats) - .await?; - if false { - let msg = inspect_timestamps(&events, range.clone()); - trace!("datablock_stream_inner_single_index read_data_1\n{}", msg); - } - { - let mut ts_max = 0; - use items::WithTimestamps; - for i in 0..events.len() { - let ts = events.ts(i); - if ts < ts_max { - error!("unordered event within block at ts {}", ts); - break; - } else { - ts_max = ts; - } - if ts < index_ts_max { - error!( - "unordered event in index branch ts {} index_ts_max {}", - ts, index_ts_max - ); - break; - } else { - index_ts_max = ts; - } - } - } - trace!("Was able to read data: {} events", events.len()); - events_tot += events.len() as u64; - let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(events))); - tx.send(item).await?; - } - Err(e) => { - // That's fine. The index mentions lots of datafiles which got purged already. - trace!("can not find file mentioned in index: {:?} {}", data_path, e); - } - }; - } - if datablock.next != 0 { - warn!("MAYBE TODO? datablock.next != 0: {:?}", datablock); - } - last_data_file_path = data_path; - last_data_file_pos = datablock.data_header_pos(); - // TODO anything special to do in expand mode? - search_ts.ns = rec.ts2.ns; - } else { - warn!("nothing found, break"); - break; - } - if events_tot >= max_events { - warn!("reached events_tot {} max_events {}", events_tot, max_events); - break; - } - } + let index_file = open_read(index_path.clone(), stats).await?; + let mut file2 = open_read(index_path.clone(), stats).await?; + debug!("opened index file: {:?} {:?}", index_path, index_file); + if let Some(basics) = read_channel(index_path.clone(), index_file, channel.name(), stats).await? { + let beg = Nanos { ns: range.beg }; + let mut expand_beg = expand; + let mut index_ts_max = 0; + let mut search_ts = beg.clone(); + let mut last_data_file_path = PathBuf::new(); + let mut last_data_file_pos = DataheaderPos(0); + loop { + let timed_search = Timed::new("search next record"); + let (res, _stats) = if expand_beg { + // TODO even though this is an entry in the index, it may reference + // non-existent blocks. + // Therefore, lower expand_beg flag at some later stage only if we've really + // found at least one event in the block. + expand_beg = false; + search_record_expand(&mut file2, basics.rtree_m, basics.rtree_start_pos, search_ts, stats).await? } else { - warn!("can not read channel basics from {:?}", index_path); + search_record(&mut file2, basics.rtree_m, basics.rtree_start_pos, search_ts, stats).await? + }; + drop(timed_search); + if let Some(nrec) = res { + let rec = nrec.rec(); + trace!("found record: {:?}", rec); + let pos = FilePos { pos: rec.child_or_id }; + // TODO rename Datablock? → IndexNodeDatablock + trace!("READ Datablock FROM {:?}\n", pos); + let datablock = read_datablockref(&mut file2, pos, basics.hver(), stats).await?; + trace!("Datablock: {:?}\n", datablock); + let data_path = index_path.parent().unwrap().join(datablock.file_name()); + if data_path == last_data_file_path && datablock.data_header_pos() == last_data_file_pos { + debug!("skipping because it is the same block"); + } else { + trace!("try to open data_path: {:?}", data_path); + match open_read(data_path.clone(), stats).await { + Ok(mut data_file) => { + let datafile_header = + read_datafile_header(&mut data_file, datablock.data_header_pos(), stats).await?; + trace!("datafile_header -------------- HEADER\n{:?}", datafile_header); + let events = + read_data_1(&mut data_file, &datafile_header, range.clone(), expand_beg, stats).await?; + if false { + let msg = inspect_timestamps(&events, range.clone()); + trace!("datablock_stream_inner_single_index read_data_1\n{}", msg); + } + { + let mut ts_max = 0; + use items::WithTimestamps; + for i in 0..events.len() { + let ts = events.ts(i); + if ts < ts_max { + error!("unordered event within block at ts {}", ts); + break; + } else { + ts_max = ts; + } + if ts < index_ts_max { + error!( + "unordered event in index branch ts {} index_ts_max {}", + ts, index_ts_max + ); + break; + } else { + index_ts_max = ts; + } + } + } + trace!("Was able to read data: {} events", events.len()); + events_tot += events.len() as u64; + let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(events))); + tx.send(item).await?; + } + Err(e) => { + // That's fine. The index mentions lots of datafiles which got purged already. + trace!("can not find file mentioned in index: {:?} {}", data_path, e); + } + }; + } + if datablock.next().0 != 0 { + warn!("MAYBE TODO? datablock.next != 0: {:?}", datablock); + } + last_data_file_path = data_path; + last_data_file_pos = datablock.data_header_pos(); + // TODO anything special to do in expand mode? + search_ts.ns = rec.ts2.ns; + } else { + warn!("nothing found, break"); + break; + } + if events_tot >= max_events { + warn!("reached events_tot {} max_events {}", events_tot, max_events); + break; } - Ok(()) - } - Err(e) => { - warn!("can not find index file at {:?}", index_path); - Err(Error::with_msg_no_trace(format!("can not open index file: {}", e))) } + } else { + warn!("can not read channel basics from {:?}", index_path); } + Ok(()) } async fn datablock_stream_inner( diff --git a/archapp/src/archeng/diskio.rs b/archapp/src/archeng/diskio.rs new file mode 100644 index 0000000..e69de29 diff --git a/archapp/src/archeng/indexfiles.rs b/archapp/src/archeng/indexfiles.rs index c0ea2ee..43be010 100644 --- a/archapp/src/archeng/indexfiles.rs +++ b/archapp/src/archeng/indexfiles.rs @@ -1,17 +1,19 @@ +use crate::archeng::{open_read, read, StatsChannel}; +use crate::timed::Timed; use crate::wrap_task; use async_channel::Receiver; use err::Error; -use futures_core::Future; -use futures_core::Stream; +use futures_core::{Future, Stream}; use futures_util::stream::unfold; -use futures_util::FutureExt; use netpod::log::*; -use netpod::ChannelArchiver; -use netpod::Database; +use netpod::{Channel, ChannelArchiver, Database}; +use regex::Regex; +use std::collections::BTreeMap; use std::path::PathBuf; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::fs::read_dir; +use tokio::sync::Mutex; use tokio_postgres::Client as PgClient; pub fn list_index_files(node: &ChannelArchiver) -> Receiver> { @@ -174,8 +176,36 @@ impl ScanIndexFiles { info!("collected {} level 1 paths", paths.len()); let dbc = database_connect(&self.conf.database).await?; for p in paths { - let sql = "insert into indexfiles (path) values ($1) on conflict do nothing"; - dbc.query(sql, &[&p.to_string_lossy()]).await?; + let ps = p.to_string_lossy(); + let rows = dbc + .query("select rowid from indexfiles where path = $1", &[&ps]) + .await?; + let rid: i64 = if rows.len() == 0 { + let rows = dbc + .query( + "insert into indexfiles (path) values ($1) on conflict do nothing returning rowid", + &[&ps], + ) + .await?; + if rows.len() == 0 { + error!("insert failed, maybe concurrent insert?"); + // TODO try this channel again? or the other process handled it? + err::todoval() + } else if rows.len() == 1 { + let rid = rows[0].try_get(0)?; + info!("insert done: {}", rid); + rid + } else { + return Err(Error::with_msg("not unique")); + } + } else if rows.len() == 1 { + let rid = rows[0].try_get(0)?; + info!("select done: {}", rid); + rid + } else { + return Err(Error::with_msg("not unique")); + }; + let _ = rid; } self.steps = ScanIndexFilesSteps::Done; let item = format!("level 1 done"); @@ -256,6 +286,7 @@ pub fn unfold2(_conf: ChannelArchiver) -> () { enum ScanChannelsSteps { Start, SelectIndexFile, + ReadChannels(Vec), Done, } @@ -288,8 +319,66 @@ impl ScanChannels { for row in rows { paths.push(row.get::<_, String>(0)); } + let item = format!("SelectIndexFile {:?}", paths); + self.steps = ReadChannels(paths); + Ok(Some((item, self))) + } + ReadChannels(mut paths) => { + // TODO stats + let stats = &StatsChannel::dummy(); + let dbc = database_connect(&self.conf.database).await?; + if let Some(path) = paths.pop() { + let rows = dbc + .query("select rowid from indexfiles where path = $1", &[&path]) + .await?; + if rows.len() == 1 { + let indexfile_rid: i64 = rows[0].try_get(0)?; + let mut basics = super::indextree::IndexFileBasics::from_path(path, stats).await?; + let entries = basics.all_channel_entries(stats).await?; + for entry in entries { + let rows = dbc + .query("select rowid from channels where name = $1", &[&entry.channel_name()]) + .await?; + let rid: i64 = if rows.len() == 0 { + let rows = dbc + .query( + "insert into channels (name) values ($1) on conflict do nothing returning rowid", + &[&entry.channel_name()], + ) + .await?; + if rows.len() == 0 { + error!("insert failed, maybe concurrent insert?"); + // TODO try this channel again? or the other process handled it? + err::todoval() + } else if rows.len() == 1 { + let rid = rows[0].try_get(0)?; + info!("insert done: {}", rid); + rid + } else { + return Err(Error::with_msg("not unique")); + } + } else if rows.len() == 1 { + let rid = rows[0].try_get(0)?; + info!("select done: {}", rid); + rid + } else { + return Err(Error::with_msg("not unique")); + }; + dbc.query( + "insert into channel_index_map (channel, index) values ($1, $2) on conflict do nothing", + &[&rid, &indexfile_rid], + ) + .await?; + } + dbc.query( + "update indexfiles set ts_last_channel_search = now() where rowid = $1", + &[&indexfile_rid], + ) + .await?; + } + } self.steps = Done; - Ok(Some((format!("SelectIndexFile {:?}", paths), self))) + Ok(Some((format!("ReadChannels"), self))) } Done => Ok(None), } @@ -310,3 +399,186 @@ impl UnfoldExec for ScanChannels { pub fn scan_channels(conf: ChannelArchiver) -> impl Stream> { unfold_stream(ScanChannels::new(conf.clone())) } + +#[derive(Debug)] +enum RetClass { + Long, + Medium, + Short, + #[allow(unused)] + PostMortem, +} + +#[derive(Debug)] +enum IndexCat { + Machine { rc: RetClass }, + Beamline { rc: RetClass, name: String }, +} + +#[derive(Debug)] +struct IndexFile { + path: PathBuf, + cat: IndexCat, +} + +// Try to make sense of historical conventions how the epics channel archiver engines are configured. +fn categorize_index_files(list: &Vec) -> Result, Error> { + let re_m = Regex::new(r"/archive_(ST|MT|LT)/index").unwrap(); + let re_b = Regex::new(r"/archive_(X([0-9]+)[^_]*)_(SH|LO)/index").unwrap(); + let mut ret = vec![]; + for p in list { + match re_m.captures(p) { + Some(cap) => { + let rc = cap.get(1).unwrap().as_str(); + let rc = match rc { + "ST" => Some(RetClass::Short), + "MT" => Some(RetClass::Medium), + "LT" => Some(RetClass::Long), + _ => { + warn!("categorize_index_files no idea about RC for {}", p); + None + } + }; + if let Some(rc) = rc { + let f = IndexFile { + path: p.into(), + cat: IndexCat::Machine { rc }, + }; + ret.push(f); + } + } + None => match re_b.captures(p) { + Some(cap) => { + let name = cap.get(1).unwrap().as_str(); + let rc = cap.get(3).unwrap().as_str(); + let rc = match rc { + "SH" => Some(RetClass::Short), + "LO" => Some(RetClass::Long), + _ => { + warn!("categorize_index_files no idea about RC for {}", p); + None + } + }; + if let Some(rc) = rc { + let f = IndexFile { + path: p.into(), + cat: IndexCat::Beamline { name: name.into(), rc }, + }; + ret.push(f); + } + } + None => { + warn!("categorize_index_files no idea at all about {}", p); + } + }, + } + } + let is_machine = { + let mut k = false; + for x in &ret { + if let IndexCat::Machine { .. } = &x.cat { + k = true; + break; + } + } + k + }; + // TODO by default, filter post-mortem. + let is_beamline = !is_machine; + if is_beamline { + let mut ret: Vec<_> = ret + .into_iter() + .filter_map(|k| { + if let IndexCat::Machine { rc, .. } = &k.cat { + let prio = match rc { + &RetClass::Short => 4, + &RetClass::Medium => 6, + &RetClass::Long => 8, + &RetClass::PostMortem => 0, + }; + Some((k, prio)) + } else { + None + } + }) + .collect(); + ret.sort_by_key(|x| x.1); + let ret = ret.into_iter().map(|k| k.0).collect(); + Ok(ret) + } else if is_machine { + let mut ret: Vec<_> = ret + .into_iter() + .filter_map(|k| { + if let IndexCat::Machine { rc, .. } = &k.cat { + let prio = match rc { + &RetClass::Short => 4, + &RetClass::Medium => 6, + &RetClass::Long => 8, + &RetClass::PostMortem => 0, + }; + Some((k, prio)) + } else { + None + } + }) + .collect(); + ret.sort_by_key(|x| x.1); + let ret = ret.into_iter().map(|k| k.0).collect(); + Ok(ret) + } else { + err::todoval() + } +} + +static INDEX_JSON: Mutex>>> = Mutex::const_new(None); + +pub async fn index_files_index_ref + Send>( + key: &str, + index_files_index_path: P, + stats: &StatsChannel, +) -> Result>, Error> { + let mut g = INDEX_JSON.lock().await; + match &*g { + Some(j) => Ok(j.get(key).map(|x| x.clone())), + None => { + let timed1 = Timed::new("slurp_index_json"); + let index_files_index_path = index_files_index_path.into(); + let index_files_index = { + let timed1 = Timed::new("slurp_index_bytes"); + let mut index_files_index = open_read(index_files_index_path, stats).await?; + let mut buf = vec![0; 1024 * 1024 * 50]; + let mut ntot = 0; + loop { + let n = read(&mut index_files_index, &mut buf[ntot..], stats).await?; + if n == 0 { + break; + } + ntot += n; + } + buf.truncate(ntot); + drop(timed1); + serde_json::from_slice::>>(&buf)? + }; + drop(timed1); + let ret = index_files_index.get(key).map(|x| x.clone()); + *g = Some(index_files_index); + Ok(ret) + } + } +} + +pub async fn index_file_path_list( + channel: Channel, + index_files_index_path: PathBuf, + stats: &StatsChannel, +) -> Result, Error> { + let timed1 = Timed::new("categorize index files"); + let index_paths = index_files_index_ref(channel.name(), &index_files_index_path, stats) + .await? + .ok_or(Error::with_msg_no_trace("can not find channel"))?; + let list = categorize_index_files(&index_paths)?; + info!("GOT CATEGORIZED:\n{:?}", list); + let ret = list.into_iter().map(|k| k.path).collect(); + drop(timed1); + Ok(ret) +} diff --git a/archapp/src/archeng/indextree.rs b/archapp/src/archeng/indextree.rs new file mode 100644 index 0000000..88efb51 --- /dev/null +++ b/archapp/src/archeng/indextree.rs @@ -0,0 +1,1272 @@ +use crate::archeng::{ + format_hex_block, name_hash, open_read, readu16, readu32, readu64, seek, RingBuf, StatsChannel, EPICS_EPOCH_OFFSET, +}; +use err::Error; +use netpod::{log::*, NanoRange}; +use netpod::{timeunits::SEC, FilePos, Nanos}; +use std::collections::VecDeque; +use std::fmt; +use std::io::SeekFrom; +use std::path::{Path, PathBuf}; +use std::time::{Duration, Instant}; +use tokio::fs::File; + +pub trait HeaderVersion: Send + Sync + fmt::Debug { + fn version(&self) -> u8; + fn read_offset(&self, buf: &[u8], pos: usize) -> u64; + fn offset_size(&self) -> usize; + fn duplicate(&self) -> Box; +} + +#[derive(Debug)] +struct HeaderVersion2; + +impl HeaderVersion for HeaderVersion2 { + fn version(&self) -> u8 { + 2 + } + + fn read_offset(&self, buf: &[u8], pos: usize) -> u64 { + readu32(buf, pos) as u64 + } + + fn offset_size(&self) -> usize { + 4 + } + + fn duplicate(&self) -> Box { + Box::new(Self) + } +} + +#[derive(Debug)] +struct HeaderVersion3; + +impl HeaderVersion for HeaderVersion3 { + fn version(&self) -> u8 { + 3 + } + + fn read_offset(&self, buf: &[u8], pos: usize) -> u64 { + readu64(buf, pos) + } + + fn offset_size(&self) -> usize { + 8 + } + + fn duplicate(&self) -> Box { + Box::new(Self) + } +} + +#[derive(Debug)] +pub struct NamedHashTableEntry { + named_hash_channel_entry_pos: u64, +} + +#[derive(Debug)] +pub struct NamedHashChannelEntry { + next: u64, + id_rtree_pos: u64, + channel_name: String, + id_txt: String, +} + +impl NamedHashChannelEntry { + pub fn channel_name(&self) -> &str { + &self.channel_name + } +} + +#[derive(Debug)] +pub struct IndexFileBasics { + file: File, + path: PathBuf, + version: u8, + name_hash_anchor_beg: u64, + name_hash_anchor_len: u64, + fa_used_list_beg: u64, + fa_used_list_end: u64, + fa_used_list_len: u64, + fa_free_list_beg: u64, + fa_free_list_end: u64, + fa_free_list_len: u64, + fa_header_prev: u64, + fa_header_next: u64, + fa_header_len: u64, + name_hash_entries: Vec, + hver: Box, +} + +impl IndexFileBasics { + pub async fn from_path(path: impl Into, stats: &StatsChannel) -> Result { + let path = path.into(); + let file = open_read(path.clone(), stats).await?; + read_file_basics(path, file, stats).await + } +} + +pub async fn read_file_basics(path: PathBuf, file: File, stats: &StatsChannel) -> Result { + let mut file = file; + let mut rb = RingBuf::new(); + rb.fill_min(&mut file, 4, stats).await?; + let buf = rb.data(); + let version = String::from_utf8(buf[3..4].to_vec())?.parse()?; + let min0; + if version == 3 { + min0 = 88; + } else if version == 2 { + min0 = 48; + } else { + panic!(); + } + rb.fill_min(&mut file, min0, stats).await?; + let buf = rb.data(); + let mut ret = if version == 3 { + IndexFileBasics { + file, + path, + version, + name_hash_anchor_beg: readu64(buf, 4), + name_hash_anchor_len: readu32(buf, 12) as u64, + fa_used_list_len: readu64(buf, 16), + fa_used_list_beg: readu64(buf, 24), + fa_used_list_end: readu64(buf, 32), + fa_free_list_len: readu64(buf, 40), + fa_free_list_beg: readu64(buf, 48), + fa_free_list_end: readu64(buf, 56), + fa_header_len: readu64(buf, 64), + fa_header_prev: readu64(buf, 72), + fa_header_next: readu64(buf, 80), + name_hash_entries: vec![], + hver: Box::new(HeaderVersion3), + } + } else if version == 2 { + IndexFileBasics { + file, + path, + version, + name_hash_anchor_beg: readu32(buf, 4) as u64, + name_hash_anchor_len: readu32(buf, 8) as u64, + fa_used_list_len: readu32(buf, 12) as u64, + fa_used_list_beg: readu32(buf, 16) as u64, + fa_used_list_end: readu32(buf, 20) as u64, + fa_free_list_len: readu32(buf, 24) as u64, + fa_free_list_beg: readu32(buf, 28) as u64, + fa_free_list_end: readu32(buf, 32) as u64, + fa_header_len: readu32(buf, 36) as u64, + fa_header_prev: readu32(buf, 40) as u64, + fa_header_next: readu32(buf, 44) as u64, + name_hash_entries: vec![], + hver: Box::new(HeaderVersion2), + } + } else { + return Err(Error::with_msg_no_trace(format!("unhandled version {}", version))); + }; + rb.adv(min0); + if ret.name_hash_anchor_len > 2000 { + return Err(Error::with_msg_no_trace(format!( + "name_hash_anchor_len {}", + ret.name_hash_anchor_len + ))); + } + { + let hver = &ret.hver; + for _ in 0..ret.name_hash_anchor_len { + rb.fill_min(&mut ret.file, hver.offset_size(), stats).await?; + let buf = rb.data(); + let pos = hver.read_offset(buf, 0); + rb.adv(hver.offset_size()); + let e = NamedHashTableEntry { + named_hash_channel_entry_pos: pos, + }; + ret.name_hash_entries.push(e); + } + } + Ok(ret) +} + +impl IndexFileBasics { + pub async fn all_channel_entries(&mut self, stats: &StatsChannel) -> Result, Error> { + let mut entries = vec![]; + let mut rb = RingBuf::new(); + for epos in &self.name_hash_entries { + if epos.named_hash_channel_entry_pos != 0 { + let mut pos = epos.named_hash_channel_entry_pos; + while pos != 0 { + rb.reset(); + seek(&mut self.file, SeekFrom::Start(pos), stats).await?; + let min0 = 4 + 2 * self.hver.offset_size(); + rb.fill_min(&mut self.file, min0, stats).await?; + let buf = rb.data(); + let entry = parse_name_hash_channel_entry(buf, self.hver.as_ref())?; + info!("parsed entry {:?}", entry); + pos = entry.next; + entries.push(entry); + } + } + } + Ok(entries) + } + + pub async fn rtree_for_channel(&self, channel_name: &str, stats: &StatsChannel) -> Result, Error> { + let mut index_file = open_read(self.path.clone(), stats).await?; + let chn_hash = name_hash(channel_name, self.name_hash_anchor_len as u32); + let epos = &self.name_hash_entries[chn_hash as usize]; + let mut pos = epos.named_hash_channel_entry_pos; + if pos == 0 { + warn!("no hash entry for channel {}", channel_name); + } + let mut entries = vec![]; + let mut rb = RingBuf::new(); + while pos != 0 { + rb.reset(); + seek(&mut index_file, SeekFrom::Start(pos), stats).await?; + let min0 = 4 + 2 * self.hver.offset_size(); + rb.fill_min(&mut index_file, min0, stats).await?; + let buf = rb.data(); + let e = parse_name_hash_channel_entry(buf, self.hver.as_ref())?; + let next = e.next; + entries.push(e); + pos = next; + } + for e in &entries { + if e.channel_name == channel_name { + let hver = self.hver.duplicate(); + let pos = RtreePos(e.id_rtree_pos); + // TODO Rtree could reuse the File here: + let tree = Rtree::new(self.path.clone(), index_file, pos, hver, stats).await?; + return Ok(Some(tree)); + } + } + Ok(None) + } +} + +#[derive(Debug)] +pub struct RTreeNodeRecord { + pub ts1: Nanos, + pub ts2: Nanos, + // TODO should probably be better name `child or offset` and be made enum. + pub child_or_id: u64, +} + +#[derive(Debug)] +pub struct RTreeNode { + pub pos: FilePos, + pub records: Vec, + pub is_leaf: bool, + pub rtree_m: usize, +} + +#[derive(Debug)] +pub struct RTreeNodeAtRecord { + pub node: RTreeNode, + pub rix: usize, +} + +impl RTreeNodeAtRecord { + pub fn rec(&self) -> &RTreeNodeRecord { + &self.node.records[self.rix] + } +} + +// TODO refactor as struct, rtree_m is a property of the tree. +pub async fn read_rtree_node( + file: &mut File, + pos: FilePos, + rtree_m: usize, + stats: &StatsChannel, +) -> Result { + const OFF1: usize = 9; + const RLEN: usize = 24; + const NANO_MAX: u32 = 999999999; + seek(file, SeekFrom::Start(pos.into()), stats).await?; + let mut rb = RingBuf::new(); + // TODO must know how much data I need at least... + rb.fill_min(file, OFF1 + rtree_m * RLEN, stats).await?; + if false { + let s = format_hex_block(rb.data(), 128); + info!("RTREE NODE:\n{}", s); + } + // TODO remove, this was using a hard offset size. + if rb.len() < 1 + 8 { + return Err(Error::with_msg_no_trace("could not read enough")); + } + let b = rb.data(); + let is_leaf = b[0] != 0; + let parent = readu64(b, 1); + if false { + info!("is_leaf: {} parent: {}", is_leaf, parent); + } + let recs = (0..rtree_m) + .into_iter() + .filter_map(|i| { + let off2 = OFF1 + i * RLEN; + let ts1a = readu32(b, off2 + 0); + let ts1b = readu32(b, off2 + 4); + let ts2a = readu32(b, off2 + 8); + let ts2b = readu32(b, off2 + 12); + let ts1b = ts1b.min(NANO_MAX); + let ts2b = ts2b.min(NANO_MAX); + let ts1 = ts1a as u64 * SEC + ts1b as u64 + EPICS_EPOCH_OFFSET; + let ts2 = ts2a as u64 * SEC + ts2b as u64 + EPICS_EPOCH_OFFSET; + let child_or_id = readu64(b, off2 + 16); + //info!("NODE {} {} {} {} {}", ts1a, ts1b, ts2a, ts2b, child_or_id); + if child_or_id != 0 && ts2 != 0 { + let rec = RTreeNodeRecord { + ts1: Nanos { ns: ts1 }, + ts2: Nanos { ns: ts2 }, + child_or_id, + }; + Some(rec) + } else { + None + } + }) + .collect(); + let node = RTreeNode { + pos, + records: recs, + is_leaf, + rtree_m, + }; + Ok(node) +} + +#[derive(Clone, Debug, PartialEq)] +pub struct NodePos(u64); + +#[derive(Clone, Debug, PartialEq)] +pub struct DatarefPos(pub u64); + +#[derive(Clone, Debug, PartialEq)] +pub struct DataheaderPos(pub u64); + +#[derive(Clone, Debug)] +pub struct RtreePos(u64); + +#[derive(Clone, Debug)] +pub enum RecordTarget { + Child(NodePos), + Dataref(DatarefPos), +} + +#[derive(Clone, Debug)] +pub struct RtreeRecord { + pub beg: Nanos, + pub end: Nanos, + pub target: RecordTarget, +} + +pub struct RtreeNode { + pub pos: NodePos, + pub parent: Option, + pub records: Vec, + pub is_leaf: bool, + pub rtree_m: usize, +} + +impl fmt::Debug for RtreeNode { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let n = self.records.len(); + let recs = if n == 0 { + vec![] + } else if n == 1 { + vec![&self.records[0]] + } else { + vec![&self.records[0], &self.records[n - 1]] + }; + f.debug_struct("RtreeNode") + .field("pos", &self.pos) + .field("parent", &self.parent) + .field("rec_count", &n) + .field("recs", &recs) + .field("is_leaf", &self.is_leaf) + .field("rtree_m", &self.rtree_m) + .finish() + } +} + +#[derive(Debug)] +pub struct RtreeNodeAtRecord { + pub node: RtreeNode, + pub rix: usize, +} + +impl RtreeNodeAtRecord { + pub fn rec(&self) -> Option<&RtreeRecord> { + if self.rix < self.node.records.len() { + Some(&self.node.records[self.rix]) + } else { + None + } + } + + pub fn advance(&mut self) -> Result { + self.rix += 1; + if self.rix < self.node.records.len() { + //trace!("Rec-Adv Ok {}", self.rix); + Ok(true) + } else { + //trace!("Rec-Adv End {}", self.rix); + Ok(false) + } + } +} + +#[derive(Debug)] +pub struct Rtree { + path: PathBuf, + file: File, + m: usize, + root: NodePos, + hver: Box, +} + +impl Rtree { + pub async fn new( + path: impl AsRef, + file: File, + pos: RtreePos, + hver: Box, + stats: &StatsChannel, + ) -> Result { + let mut file = file; + let (m, root) = Self::read_entry(&mut file, pos, hver.as_ref(), stats).await?; + let ret = Self { + path: path.as_ref().into(), + file, + m, + root, + hver, + }; + Ok(ret) + } + + async fn read_entry( + file: &mut File, + pos: RtreePos, + hver: &dyn HeaderVersion, + stats: &StatsChannel, + ) -> Result<(usize, NodePos), Error> { + seek(file, SeekFrom::Start(pos.0), stats).await?; + let mut rb = RingBuf::new(); + // TODO should be able to indicate how much I need at most before I know that I will e.g. seek or abort. + let min0 = hver.offset_size() + 4; + rb.fill_min(file, min0, stats).await?; + if rb.len() < min0 { + return Err(Error::with_msg_no_trace("could not read enough")); + } + let buf = rb.data(); + let mut p = 0; + let node_offset = hver.read_offset(buf, p); + p += hver.offset_size(); + let m = readu32(buf, p) as usize; + p += 4; + let _ = p; + let root = NodePos(node_offset); + let ret = (m, root); + Ok(ret) + } + + pub async fn read_node_at(&mut self, pos: NodePos, stats: &StatsChannel) -> Result { + let file = &mut self.file; + seek(file, SeekFrom::Start(pos.0), stats).await?; + let mut rb = RingBuf::new(); + let off1 = 1 + self.hver.offset_size(); + let rlen = 4 * 4 + self.hver.offset_size(); + let min0 = off1 + self.m * rlen; + rb.fill_min(file, min0, stats).await?; + if false { + let s = format_hex_block(rb.data(), min0); + trace!("RTREE NODE:\n{}", s); + } + let buf = rb.data(); + let mut p = 0; + let is_leaf = buf[p] != 0; + p += 1; + let parent = self.hver.read_offset(buf, p); + p += self.hver.offset_size(); + let _ = p; + let parent = if parent == 0 { None } else { Some(NodePos(parent)) }; + if false { + trace!("is_leaf: {} parent: {:?}", is_leaf, parent); + } + let recs = (0..self.m) + .into_iter() + .filter_map(|i| { + const NANO_MAX: u32 = 999999999; + let off2 = off1 + i * rlen; + let ts1a = readu32(buf, off2 + 0); + let ts1b = readu32(buf, off2 + 4); + let ts2a = readu32(buf, off2 + 8); + let ts2b = readu32(buf, off2 + 12); + let ts1b = ts1b.min(NANO_MAX); + let ts2b = ts2b.min(NANO_MAX); + let ts1 = ts1a as u64 * SEC + ts1b as u64 + EPICS_EPOCH_OFFSET; + let ts2 = ts2a as u64 * SEC + ts2b as u64 + EPICS_EPOCH_OFFSET; + let target = self.hver.read_offset(buf, off2 + 16); + //trace!("NODE {} {} {} {} {}", ts1a, ts1b, ts2a, ts2b, child_or_id); + if target != 0 && ts2 != 0 { + let target = if is_leaf { + RecordTarget::Dataref(DatarefPos(target)) + } else { + RecordTarget::Child(NodePos(target)) + }; + let rec = RtreeRecord { + beg: Nanos { ns: ts1 }, + end: Nanos { ns: ts2 }, + target, + }; + Some(rec) + } else { + None + } + }) + .collect(); + let node = RtreeNode { + pos, + parent, + records: recs, + is_leaf, + rtree_m: self.m, + }; + Ok(node) + } + + pub async fn iter_range(&mut self, range: NanoRange, stats: &StatsChannel) -> Result, Error> { + // TODO RecordIter needs to know when to stop after range. + let ts1 = Instant::now(); + let mut stack = VecDeque::new(); + let mut node = self.read_node_at(self.root.clone(), stats).await?; + let mut node_reads = 1; + 'outer: loop { + let nr = node.records.len(); + for (i, rec) in node.records.iter().enumerate() { + if rec.beg.ns > range.beg { + match &rec.target { + RecordTarget::Child(child) => { + trace!("found non-leaf match at {} / {}", i, nr); + let child = child.clone(); + let nr = RtreeNodeAtRecord { node, rix: i }; + node = self.read_node_at(child, stats).await?; + node_reads += 1; + stack.push_back(nr); + continue 'outer; + } + RecordTarget::Dataref(_dataref) => { + trace!("found leaf match at {} / {}", i, nr); + let nr = RtreeNodeAtRecord { node, rix: i }; + stack.push_back(nr); + let ret = RecordIter { + tree: self.reopen(stats).await?, + stack, + stats: stats.clone(), + }; + let stats = TreeSearchStats::new(ts1, node_reads); + trace!("iter_range done stats: {:?}", stats); + return Ok(Some(ret)); + } + } + } + } + let stats = TreeSearchStats::new(ts1, node_reads); + trace!("iter_range done stats: {:?}", stats); + return Ok(None); + } + } + + pub async fn reopen(&self, stats: &StatsChannel) -> Result { + let file = open_read(self.path.clone(), stats).await?; + let ret = Self { + path: self.path.clone(), + file: file, + m: self.m, + root: self.root.clone(), + hver: self.hver.duplicate(), + }; + Ok(ret) + } +} + +#[derive(Debug)] +pub struct RecordIter { + tree: Rtree, + stack: VecDeque, + stats: StatsChannel, +} + +impl RecordIter { + pub async fn next(&mut self) -> Result, Error> { + match self.stack.back_mut() { + Some(nr) => { + assert_eq!(nr.node.is_leaf, true); + if let Some(ret) = nr.rec() { + let ret = ret.clone(); + if nr.advance()? { + //trace!("still more records here {} / {}", nr.rix, nr.node.records.len()); + } else { + loop { + if self.stack.pop_back().is_none() { + panic!(); + } + if let Some(n2) = self.stack.back_mut() { + if n2.advance()? { + //trace!("advanced some parent {} / {}", n2.rix, n2.node.records.len()); + break; + } + } else { + // The stack is empty. We can not advance, but want to return our current `ret` + break; + } + } + loop { + if let Some(n2) = self.stack.back() { + if let Some(r2) = n2.rec() { + match &r2.target { + RecordTarget::Child(child) => { + trace!("Read next lower {:?}", child); + let n3 = self.tree.read_node_at(child.clone(), &self.stats).await?; + trace!("n3: {:?}", n3); + let nr3 = RtreeNodeAtRecord { node: n3, rix: 0 }; + self.stack.push_back(nr3); + } + RecordTarget::Dataref(_) => { + info!("loop B is-leaf"); + // done, we've positioned the next result. + break; + } + } + } else { + // Should not get here, the above loop should not break in this case. + panic!(); + } + } else { + // Happens when we exhaust the iterator. + break; + } + } + } + Ok(Some(ret)) + } else { + Ok(None) + } + } + None => Ok(None), + } + } +} + +pub async fn read_rtree_entrypoint( + file: &mut File, + pos: u64, + _basics: &IndexFileBasics, + stats: &StatsChannel, +) -> Result { + seek(file, SeekFrom::Start(pos), stats).await?; + let mut rb = RingBuf::new(); + // TODO remove, this is anyway still using a hardcoded offset size. + rb.fill_min(file, 8 + 4, stats).await?; + if rb.len() < 8 + 4 { + return Err(Error::with_msg_no_trace("could not read enough")); + } + let b = rb.data(); + let node_offset = readu64(b, 0); + let rtree_m = readu32(b, 8); + //info!("node_offset: {} rtree_m: {}", node_offset, rtree_m); + let pos = FilePos { pos: node_offset }; + let node = read_rtree_node(file, pos, rtree_m as usize, stats).await?; + //info!("read_rtree_entrypoint READ ROOT NODE: {:?}", node); + Ok(node) +} + +#[derive(Debug)] +pub struct TreeSearchStats { + duration: Duration, + node_reads: usize, +} + +impl TreeSearchStats { + fn new(ts1: Instant, node_reads: usize) -> Self { + Self { + duration: Instant::now().duration_since(ts1), + node_reads, + } + } +} + +pub async fn search_record( + file: &mut File, + rtree_m: usize, + start_node_pos: FilePos, + beg: Nanos, + stats: &StatsChannel, +) -> Result<(Option, TreeSearchStats), Error> { + let ts1 = Instant::now(); + let mut node = read_rtree_node(file, start_node_pos, rtree_m, stats).await?; + let mut node_reads = 1; + 'outer: loop { + let nr = node.records.len(); + for (i, rec) in node.records.iter().enumerate() { + if rec.ts2.ns > beg.ns { + if node.is_leaf { + trace!("found leaf match at {} / {}", i, nr); + let ret = RTreeNodeAtRecord { node, rix: i }; + let stats = TreeSearchStats::new(ts1, node_reads); + return Ok((Some(ret), stats)); + } else { + trace!("found non-leaf match at {} / {}", i, nr); + let pos = FilePos { pos: rec.child_or_id }; + node = read_rtree_node(file, pos, rtree_m, stats).await?; + node_reads += 1; + continue 'outer; + } + } + } + { + let stats = TreeSearchStats::new(ts1, node_reads); + return Ok((None, stats)); + } + } +} + +pub async fn search_record_expand_try( + file: &mut File, + rtree_m: usize, + start_node_pos: FilePos, + beg: Nanos, + stats: &StatsChannel, +) -> Result<(Option, TreeSearchStats), Error> { + let ts1 = Instant::now(); + let mut node = read_rtree_node(file, start_node_pos, rtree_m, stats).await?; + let mut node_reads = 1; + 'outer: loop { + let nr = node.records.len(); + for (i, rec) in node.records.iter().enumerate().rev() { + if rec.ts1.ns <= beg.ns { + if node.is_leaf { + trace!("found leaf match at {} / {}", i, nr); + let ret = RTreeNodeAtRecord { node, rix: i }; + let stats = TreeSearchStats::new(ts1, node_reads); + return Ok((Some(ret), stats)); + } else { + // TODO + // We rely on channel archiver engine that there is at least one event + // in the referenced range. It should according to docs, but who knows. + trace!("found non-leaf match at {} / {}", i, nr); + let pos = FilePos { pos: rec.child_or_id }; + node = read_rtree_node(file, pos, rtree_m, stats).await?; + node_reads += 1; + continue 'outer; + } + } + } + { + let stats = TreeSearchStats::new(ts1, node_reads); + return Ok((None, stats)); + } + } +} + +pub async fn search_record_expand( + file: &mut File, + rtree_m: usize, + start_node_pos: FilePos, + beg: Nanos, + stats: &StatsChannel, +) -> Result<(Option, TreeSearchStats), Error> { + let ts1 = Instant::now(); + let res = search_record_expand_try(file, rtree_m, start_node_pos, beg, stats).await?; + match res { + (Some(res), stats) => { + let ts2 = Instant::now(); + info!("search_record_expand took {:?}", ts2.duration_since(ts1)); + Ok((Some(res), stats)) + } + _ => { + let res = search_record(file, rtree_m, start_node_pos, beg, stats).await; + let ts2 = Instant::now(); + info!("search_record_expand took {:?}", ts2.duration_since(ts1)); + res + } + } +} + +#[derive(Debug)] +pub struct ChannelInfoBasics { + pub channel_name: String, + pub rtree_m: usize, + pub rtree_start_pos: FilePos, + pub basics: IndexFileBasics, +} + +const HVER2: HeaderVersion2 = HeaderVersion2; +const HVER3: HeaderVersion3 = HeaderVersion3; + +impl ChannelInfoBasics { + pub fn hver(&self) -> &dyn HeaderVersion { + if self.basics.version == 2 { + &HVER2 + } else if self.basics.version == 3 { + &HVER3 + } else { + panic!() + } + } +} + +// TODO retire this function. +pub async fn read_channel( + path: impl Into, + index_file: File, + channel_name: &str, + stats: &StatsChannel, +) -> Result, Error> { + let path = path.into(); + let mut basics = read_file_basics(path.clone(), index_file, stats).await?; + let chn_hash = name_hash(channel_name, basics.name_hash_anchor_len as u32); + let epos = &basics.name_hash_entries[chn_hash as usize]; + let mut entries = vec![]; + let mut rb = RingBuf::new(); + let mut pos = epos.named_hash_channel_entry_pos; + loop { + rb.reset(); + seek(&mut basics.file, SeekFrom::Start(pos), stats).await?; + let fill_min = if basics.hver.offset_size() == 8 { 20 } else { 12 }; + rb.fill_min(&mut basics.file, fill_min, stats).await?; + if rb.len() < fill_min { + warn!("not enough data to continue reading channel list from name hash list"); + break; + } + let buf = rb.data(); + let e = parse_name_hash_channel_entry(buf, basics.hver.as_ref())?; + let next = e.next; + entries.push(e); + if next == 0 { + break; + } else { + pos = next; + } + } + let mut file2 = open_read(path, stats).await?; + for e in &entries { + if e.channel_name == channel_name { + let ep = read_rtree_entrypoint(&mut file2, e.id_rtree_pos, &basics, stats).await?; + let ret = ChannelInfoBasics { + channel_name: channel_name.into(), + rtree_m: ep.rtree_m, + rtree_start_pos: ep.pos, + basics, + }; + return Ok(Some(ret)); + } + } + Ok(None) +} + +fn parse_name_hash_channel_entry(buf: &[u8], hver: &dyn HeaderVersion) -> Result { + let mut p1 = 0; + let next = hver.read_offset(&buf, p1); + p1 += hver.offset_size(); + let id = hver.read_offset(&buf, p1); + p1 += hver.offset_size(); + let name_len = readu16(&buf, p1); + p1 += 2; + let id_txt_len = readu16(&buf, p1); + p1 += 2; + if next > 1024 * 1024 * 1024 * 1024 || id > 1024 * 1024 * 1024 * 1024 || name_len > 128 || id_txt_len > 128 { + error!( + "something bad: parse_name_hash_channel_entry next {} id {} name_len {} id_txt_len {}", + next, id, name_len, id_txt_len + ); + return Err(Error::with_msg_no_trace("bad hash table entry")); + } + let n1 = name_len as usize; + let n2 = id_txt_len as usize; + let channel_name_found = String::from_utf8(buf[p1..p1 + n1].to_vec())?; + p1 += n1; + let id_txt = String::from_utf8(buf[p1..p1 + n2].to_vec())?; + p1 += n2; + let _ = p1; + let e = NamedHashChannelEntry { + next, + id_rtree_pos: id, + channel_name: channel_name_found, + id_txt, + }; + Ok(e) +} + +#[derive(Debug)] +pub struct Dataref { + next: DatarefPos, + data_header_pos: DataheaderPos, + fname: String, +} + +impl Dataref { + pub fn file_name(&self) -> &str { + &self.fname + } + + pub fn data_header_pos(&self) -> DataheaderPos { + self.data_header_pos.clone() + } + + pub fn next(&self) -> DatarefPos { + self.next.clone() + } +} + +pub async fn read_datablockref( + file: &mut File, + pos: FilePos, + hver: &dyn HeaderVersion, + stats: &StatsChannel, +) -> Result { + seek(file, SeekFrom::Start(pos.pos), stats).await?; + let mut rb = RingBuf::new(); + let min0 = hver.offset_size() * 2 + 2; + rb.fill_min(file, min0, stats).await?; + let buf = rb.data(); + let mut p = 0; + let next = hver.read_offset(buf, p); + p += hver.offset_size(); + let data = hver.read_offset(buf, p); + p += hver.offset_size(); + let len = readu16(buf, p) as usize; + p += 2; + let _ = p; + rb.fill_min(file, min0 + len, stats).await?; + let buf = rb.data(); + let fname = String::from_utf8(buf[min0..min0 + len].to_vec())?; + let next = DatarefPos(next); + let data_header_pos = DataheaderPos(data); + let ret = Dataref { + next, + data_header_pos, + fname, + }; + Ok(ret) +} + +async fn channel_list_from_index_name_hash_list( + file: &mut File, + pos: FilePos, + hver: &dyn HeaderVersion, + stats: &StatsChannel, +) -> Result, Error> { + let mut pos = pos; + let mut ret = vec![]; + let mut rb = RingBuf::new(); + loop { + rb.reset(); + seek(file, SeekFrom::Start(pos.pos), stats).await?; + let fill_min = if hver.offset_size() == 8 { 20 } else { 12 }; + rb.fill_min(file, fill_min, stats).await?; + if rb.len() < fill_min { + warn!("not enough data to continue reading channel list from name hash list"); + break; + } + let e = parse_name_hash_channel_entry(rb.data(), hver)?; + let next = e.next; + ret.push(e); + if next == 0 { + break; + } else { + pos.pos = next; + } + } + Ok(ret) +} + +// TODO retire this function +pub async fn channel_list(index_path: PathBuf, stats: &StatsChannel) -> Result, Error> { + let mut ret = vec![]; + let file = open_read(index_path.clone(), stats).await?; + let mut basics = read_file_basics(index_path.clone(), file, stats).await?; + let hver2 = HeaderVersion2; + let hver3 = HeaderVersion3; + let hver: &dyn HeaderVersion = if basics.version == 2 { + &hver2 + } else if basics.version == 3 { + &hver3 + } else { + return Err(Error::with_msg_no_trace(format!( + "unexpected version {}", + basics.version + ))); + }; + for (_i, name_hash_entry) in basics.name_hash_entries.iter().enumerate() { + if name_hash_entry.named_hash_channel_entry_pos != 0 { + let pos = FilePos { + pos: name_hash_entry.named_hash_channel_entry_pos, + }; + let list = channel_list_from_index_name_hash_list(&mut basics.file, pos, hver, stats).await?; + for e in list { + ret.push(e.channel_name); + } + } + } + Ok(ret) +} + +#[cfg(test)] +mod test { + use crate::archeng::indextree::{ + read_channel, read_datablockref, read_file_basics, search_record, IndexFileBasics, + }; + use crate::archeng::{open_read, StatsChannel, EPICS_EPOCH_OFFSET}; + use err::Error; + #[allow(unused)] + use netpod::log::*; + #[allow(unused)] + use netpod::timeunits::*; + use netpod::{FilePos, NanoRange, Nanos}; + use std::path::PathBuf; + + /* + Root node: most left record ts1 965081099942616289, most right record ts2 1002441959876114632 + */ + const CHN_0_MASTER_INDEX: &str = "/data/daqbuffer-testdata/sls/gfa03/bl_arch/archive_X05DA_SH/index"; + + #[test] + fn read_file_basic_info() -> Result<(), Error> { + let fut = async { + let stats = &StatsChannel::dummy(); + let file = open_read(CHN_0_MASTER_INDEX.into(), stats).await?; + let res = read_file_basics(CHN_0_MASTER_INDEX.into(), file, stats).await?; + assert_eq!(res.version, 3); + assert_eq!(res.name_hash_anchor_beg, 88); + assert_eq!(res.name_hash_anchor_len, 1009); + // TODO makes no sense: + assert_eq!(res.fa_used_list_beg, 2611131); + assert_eq!(res.fa_used_list_end, 64); + assert_eq!(res.fa_used_list_len, 2136670); + assert_eq!(res.fa_header_next, 8160); + assert_eq!(res.fa_header_prev, 0); + assert_eq!(res.fa_header_len, 8072); + Ok(()) + }; + Ok(taskrun::run(fut).unwrap()) + } + + #[test] + fn rtree_init() -> Result<(), Error> { + let fut = async { + let stats = &StatsChannel::dummy(); + let channel_name = "X05DA-FE-WI1:TC1"; + let basics = IndexFileBasics::from_path(CHN_0_MASTER_INDEX, stats).await?; + let tree = basics.rtree_for_channel(channel_name, stats).await?; + let tree = tree.ok_or_else(|| Error::with_msg("no tree found for channel"))?; + assert_eq!(tree.m, 50); + assert_eq!(tree.root.0, 329750); + Ok(()) + }; + Ok(taskrun::run(fut).unwrap()) + } + + #[test] + fn rtree_count_all_records() -> Result<(), Error> { + let fut = async { + let stats = &StatsChannel::dummy(); + let channel_name = "X05DA-FE-WI1:TC1"; + let range = NanoRange { beg: 0, end: u64::MAX }; + let basics = IndexFileBasics::from_path(CHN_0_MASTER_INDEX, stats).await?; + let mut tree = basics + .rtree_for_channel(channel_name, stats) + .await? + .ok_or_else(|| Error::with_msg("no tree found for channel"))?; + let mut iter = tree + .iter_range(range, stats) + .await? + .ok_or_else(|| Error::with_msg("could not position iterator"))?; + let mut i1 = 0; + let mut ts_max = Nanos::from_ns(0); + while let Some(rec) = iter.next().await? { + if rec.beg <= ts_max { + return Err(Error::with_msg_no_trace("BAD ORDER")); + } + ts_max = rec.beg; + i1 += 1; + if i1 > 200000 { + break; + } + } + assert_eq!(i1, 177); + Ok(()) + }; + Ok(taskrun::run(fut).unwrap()) + } + + #[test] + fn rtree_search() -> Result<(), Error> { + let fut = async { + let stats = &StatsChannel::dummy(); + let channel_name = "X05DA-FE-WI1:TC1"; + let range = NanoRange { + beg: 1601503499684884156, + end: 1601569919634086480, + }; + let basics = IndexFileBasics::from_path(CHN_0_MASTER_INDEX, stats).await?; + let mut tree = basics + .rtree_for_channel(channel_name, stats) + .await? + .ok_or_else(|| Error::with_msg("no tree found for channel"))?; + let mut iter = tree + .iter_range(range, stats) + .await? + .ok_or_else(|| Error::with_msg("could not position iterator"))?; + let mut i1 = 0; + let mut ts_max = Nanos::from_ns(0); + while let Some(rec) = iter.next().await? { + info!("GOT RECORD: {:?} {:?}", rec.beg, rec.target); + if rec.beg <= ts_max { + return Err(Error::with_msg_no_trace("BAD ORDER")); + } + ts_max = rec.beg; + i1 += 1; + if i1 > 20000000 { + break; + } + } + /* + assert_eq!(res.node.is_leaf, true); + assert_eq!(res.node.pos.pos, 8216); + assert_eq!(res.rix, 17); + let rec = &res.node.records[res.rix]; + assert_eq!(rec.ts1.ns, 970351499684884156 + EPICS_EPOCH_OFFSET); + assert_eq!(rec.ts2.ns, 970417919634086480 + EPICS_EPOCH_OFFSET); + assert_eq!(rec.child_or_id, 185074); + let pos = FilePos { pos: rec.child_or_id }; + let datablock = read_datablockref(&mut index_file, pos, cib.hver(), stats).await?; + assert_eq!(datablock.data_header_pos().0, 52787); + assert_eq!(datablock.file_name(), "20201001/20201001"); + */ + Ok(()) + }; + Ok(taskrun::run(fut).unwrap()) + } + + #[test] + fn read_for_channel() -> Result<(), Error> { + let fut = async { + let stats = &StatsChannel::dummy(); + let index_file = open_read(CHN_0_MASTER_INDEX.into(), stats).await?; + let channel_name = "X05DA-FE-WI1:TC1"; + let res = read_channel(CHN_0_MASTER_INDEX, index_file, channel_name, stats).await?; + assert_eq!(res.is_some(), true); + let res = res.unwrap(); + assert_eq!(res.channel_name, channel_name); + assert_eq!(res.rtree_m, 50); + assert_eq!(res.rtree_start_pos.pos, 329750); + Ok(()) + }; + Ok(taskrun::run(fut).unwrap()) + } + + #[test] + fn search_record_middle() -> Result<(), Error> { + /* + These times are still without EPICS_EPOCH_OFFSET. + RTreeNodeRecord { ts1: 969729779686636130, ts2: 970351442331056677, child_or_id: 130731 }, + RTreeNodeRecord { ts1: 970351499684884156, ts2: 970417919634086480, child_or_id: 185074 }, + RTreeNodeRecord { ts1: 970417979635219603, ts2: 970429859806669835, child_or_id: 185015 }, + */ + let fut = async { + let stats = &StatsChannel::dummy(); + let index_path: PathBuf = CHN_0_MASTER_INDEX.into(); + let index_file = open_read(index_path.clone(), stats).await?; + let mut file2 = open_read(index_path.clone(), stats).await?; + let channel_name = "X05DA-FE-WI1:TC1"; + const T0: u64 = 970351442331056677 + 1 + EPICS_EPOCH_OFFSET; + let beg = Nanos { ns: T0 }; + let res = read_channel(CHN_0_MASTER_INDEX, index_file, channel_name, stats).await?; + let cib = res.unwrap(); + let (res, _stats) = search_record(&mut file2, cib.rtree_m, cib.rtree_start_pos, beg, stats).await?; + assert_eq!(res.is_some(), true); + let res = res.unwrap(); + assert_eq!(res.node.is_leaf, true); + assert_eq!(res.node.pos.pos, 8216); + assert_eq!(res.rix, 17); + let rec = &res.node.records[res.rix]; + assert_eq!(rec.ts1.ns, 970351499684884156 + EPICS_EPOCH_OFFSET); + assert_eq!(rec.ts2.ns, 970417919634086480 + EPICS_EPOCH_OFFSET); + assert_eq!(rec.child_or_id, 185074); + let pos = FilePos { pos: rec.child_or_id }; + let datablock = read_datablockref(&mut file2, pos, cib.hver(), stats).await?; + assert_eq!(datablock.data_header_pos().0, 52787); + assert_eq!(datablock.file_name(), "20201001/20201001"); + // The actual datafile for that time was not retained any longer. + // But the index still points to that. + Ok(()) + }; + Ok(taskrun::run(fut).unwrap()) + } + + // Note: this tests only the index tree, but it does not look for any actual event in some file. + #[test] + fn search_record_at_beg() -> Result<(), Error> { + let fut = async { + let stats = &StatsChannel::dummy(); + let index_file = open_read(CHN_0_MASTER_INDEX.into(), stats).await?; + let mut file2 = open_read(CHN_0_MASTER_INDEX.into(), stats).await?; + let channel_name = "X05DA-FE-WI1:TC1"; + const T0: u64 = 965081099942616289 + EPICS_EPOCH_OFFSET; + let beg = Nanos { ns: T0 }; + let res = read_channel(CHN_0_MASTER_INDEX, index_file, channel_name, stats).await?; + let cib = res.unwrap(); + let (res, _stats) = search_record(&mut file2, cib.rtree_m, cib.rtree_start_pos, beg, stats).await?; + assert_eq!(res.is_some(), true); + let res = res.unwrap(); + assert_eq!(res.node.is_leaf, true); + assert_eq!(res.node.pos.pos, 8216); + assert_eq!(res.rix, 0); + Ok(()) + }; + Ok(taskrun::run(fut).unwrap()) + } + + // Note: this tests only the index tree, but it does not look for any actual event in some file. + #[test] + fn search_record_at_end() -> Result<(), Error> { + let fut = async { + let stats = &StatsChannel::dummy(); + let index_file = open_read(CHN_0_MASTER_INDEX.into(), stats).await?; + let mut file2 = open_read(CHN_0_MASTER_INDEX.into(), stats).await?; + let channel_name = "X05DA-FE-WI1:TC1"; + const T0: u64 = 1002441959876114632 - 1 + EPICS_EPOCH_OFFSET; + let beg = Nanos { ns: T0 }; + let res = read_channel(CHN_0_MASTER_INDEX, index_file, channel_name, stats).await?; + let cib = res.unwrap(); + let (res, _stats) = search_record(&mut file2, cib.rtree_m, cib.rtree_start_pos, beg, stats).await?; + assert_eq!(res.is_some(), true); + let res = res.unwrap(); + assert_eq!(res.node.pos.pos, 1861178); + assert_eq!(res.rix, 46); + Ok(()) + }; + Ok(taskrun::run(fut).unwrap()) + } + + // Note: this tests only the index tree, but it does not look for any actual event in some file. + #[test] + fn search_record_beyond_end() -> Result<(), Error> { + let fut = async { + let stats = &StatsChannel::dummy(); + let index_file = open_read(CHN_0_MASTER_INDEX.into(), stats).await?; + let mut file2 = open_read(CHN_0_MASTER_INDEX.into(), stats).await?; + let channel_name = "X05DA-FE-WI1:TC1"; + const T0: u64 = 1002441959876114632 - 0 + EPICS_EPOCH_OFFSET; + let beg = Nanos { ns: T0 }; + let res = read_channel(CHN_0_MASTER_INDEX, index_file, channel_name, stats).await?; + let cib = res.unwrap(); + let (res, _stats) = search_record(&mut file2, cib.rtree_m, cib.rtree_start_pos, beg, stats).await?; + assert_eq!(res.is_none(), true); + Ok(()) + }; + Ok(taskrun::run(fut).unwrap()) + } +} diff --git a/httpret/src/channelarchiver.rs b/httpret/src/channelarchiver.rs index d335550..08f5ed6 100644 --- a/httpret/src/channelarchiver.rs +++ b/httpret/src/channelarchiver.rs @@ -55,7 +55,6 @@ impl ListIndexFilesHttpFunction { ))?; let s = archapp_wrap::archapp::archeng::indexfiles::list_index_files(conf); let s = futures_util::stream::unfold(s, |mut st| async move { - use futures_util::StreamExt; let x = st.next().await; match x { Some(x) => match x { @@ -190,7 +189,6 @@ impl ListChannelsHttpFunction { let s = archapp_wrap::archapp::archeng::list_all_channels(conf); let s = futures_util::stream::unfold(s, |mut st| async move { - use futures_util::StreamExt; let x = st.next().await; match x { Some(x) => match x { diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index c4737ad..0d73add 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -278,15 +278,6 @@ impl From for u64 { } } -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] -pub struct DataHeaderPos(pub u64); - -impl PartialEq for DataHeaderPos { - fn eq(&self, other: &Self) -> bool { - self.0 == other.0 - } -} - #[derive(Clone, Debug, Serialize, Deserialize)] pub enum TimeRange { Time { beg: DateTime, end: DateTime }, @@ -294,7 +285,7 @@ pub enum TimeRange { Nano { beg: u64, end: u64 }, } -#[derive(Clone, Copy, Serialize, Deserialize)] +#[derive(Clone, Copy, PartialEq, PartialOrd, Serialize, Deserialize)] pub struct Nanos { pub ns: u64, } diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index 09cb241..4e22632 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -94,6 +94,7 @@ pub fn tracing_init() { "info", "archapp::archeng=info", "archapp::archeng::datablockstream=info", + "archapp::archeng::indextree=trace", "archapp::storagemerge=info", "daqbuffer::test=trace", ]