diff --git a/archapp/src/archeng.rs b/archapp/src/archeng.rs index 3ffa995..2372e8d 100644 --- a/archapp/src/archeng.rs +++ b/archapp/src/archeng.rs @@ -210,21 +210,26 @@ fn format_hex_block(buf: &[u8], max: usize) -> String { ret } +fn readoffset(buf: &[u8], pos: usize) -> Offset { + u64::from_be_bytes(buf.as_ref()[pos..pos + OFFSET_SIZE].try_into().unwrap()) +} + fn readu64(buf: &[u8], pos: usize) -> u64 { - let pos = pos as usize; u64::from_be_bytes(buf.as_ref()[pos..pos + 8].try_into().unwrap()) } fn readu32(buf: &[u8], pos: usize) -> u32 { - let pos = pos as usize; u32::from_be_bytes(buf.as_ref()[pos..pos + 4].try_into().unwrap()) } fn readu16(buf: &[u8], pos: usize) -> u16 { - let pos = pos as usize; u16::from_be_bytes(buf.as_ref()[pos..pos + 2].try_into().unwrap()) } +fn readf64(buf: &[u8], pos: usize) -> f64 { + f64::from_be_bytes(buf.as_ref()[pos..pos + 8].try_into().unwrap()) +} + pub async fn read_file_basics(f1: &mut File) -> Result { let mut buf = vec![0; 0x58]; read_exact(f1, &mut buf).await?; @@ -516,6 +521,97 @@ pub fn datarange_stream(channel_name: &str) -> Result, Error Ok(rx) } +#[derive(Debug)] +pub struct Datablock { + next: Offset, + data: Offset, + fname: String, +} + +async fn read_index_datablockref(file: &mut File, pos: FilePos) -> Result { + seek(file, SeekFrom::Start(pos.into())).await?; + let mut rb = RingBuf::new(); + rb.fill_min(file, 18).await?; + let buf = rb.data(); + let next = readoffset(buf, 0); + let data = readoffset(buf, 8); + let len = readu16(buf, 16) as usize; + rb.fill_min(file, 18 + len).await?; + let buf = rb.data(); + let fname = String::from_utf8(buf[18..18 + len].to_vec())?; + let ret = Datablock { next, data, fname }; + Ok(ret) +} + +#[derive(Debug)] +pub struct DatafileHeader { + dir_offset: u32, + next_offset: u32, + prev_offset: u32, + curr_offset: u32, + num_samples: u32, + ctrl_info_offset: u32, + buf_size: u32, + buf_free: u32, + dbr_type: u16, + dbr_count: u16, + period: f64, + ts1: u64, + ts2: u64, + ts3: u64, +} + +async fn read_datafile_header(file: &mut File, pos: FilePos) -> Result { + seek(file, SeekFrom::Start(pos.into())).await?; + let mut rb = RingBuf::new(); + rb.fill_min(file, 88).await?; + let buf = rb.data(); + let dir_offset = readu32(buf, 0); + let next_offset = readu32(buf, 4); + let prev_offset = readu32(buf, 8); + let curr_offset = readu32(buf, 12); + let num_samples = readu32(buf, 16); + let ctrl_info_offset = readu32(buf, 20); + let buf_size = readu32(buf, 24); + let buf_free = readu32(buf, 28); + let dbr_type = readu16(buf, 32); + let dbr_count = readu16(buf, 34); + let _unused = readu32(buf, 36); + let period = readf64(buf, 40); + let ts1a = readu32(buf, 48); + let ts1b = readu32(buf, 52); + let ts2a = readu32(buf, 56); + let ts2b = readu32(buf, 60); + let ts3a = readu32(buf, 64); + let ts3b = readu32(buf, 68); + let ts1 = ts1a as u64 * SEC + ts1b as u64; + let ts2 = ts2a as u64 * SEC + ts2b as u64; + let ts3 = ts3a as u64 * SEC + ts3b as u64; + let ret = DatafileHeader { + dir_offset, + next_offset, + prev_offset, + curr_offset, + num_samples, + ctrl_info_offset, + buf_size, + buf_free, + dbr_type, + dbr_count, + period, + ts1, + ts2, + ts3, + }; + Ok(ret) +} + +async fn read_data_1(file: &mut File, pos: FilePos) -> Result<(), Error> { + let datafile_header = read_datafile_header(file, pos).await?; + info!("datafile_header {:?}", datafile_header); + Ok(()) +} + #[cfg(test)] mod test { // TODO move RangeFilter to a different crate (items?) @@ -524,12 +620,12 @@ mod test { //use disk::rangefilter::RangeFilter; //use disk::{eventblobs::EventChunkerMultifile, eventchunker::EventChunkerConf}; - use crate::archeng::{open_read, read_channel, read_file_basics}; + use crate::archeng::{open_read, read_channel, read_data_1, read_file_basics, read_index_datablockref}; use err::Error; use futures_util::StreamExt; use items::{RangeCompletableItem, StreamItem}; - use netpod::log::*; - use netpod::timeunits::{DAY, MS}; + use netpod::timeunits::{DAY, MS, SEC}; + use netpod::{log::*, FilePos}; use netpod::{ByteSize, ChannelConfig, FileIoBufferSize, Nanos}; use std::path::PathBuf; @@ -578,20 +674,64 @@ mod test { RTreeNodeRecord { ts1: 970417979635219603, ts2: 970429859806669835, child_or_id: 185015 }, */ let fut = async { - let mut index_file = open_read(CHN_0_MASTER_INDEX.into()).await?; + let index_path: PathBuf = CHN_0_MASTER_INDEX.into(); + let mut index_file = open_read(index_path.clone()).await?; let channel_name = "X05DA-FE-WI1:TC1"; const T0: u64 = 970351442331056677 + 1; let beg = Nanos { ns: T0 }; let res = read_channel(&mut index_file, channel_name).await?; let cib = res.unwrap(); - let (res, stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg).await?; + let (res, _stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg).await?; assert_eq!(res.is_some(), true); let res = res.unwrap(); + assert_eq!(res.node.is_leaf, true); assert_eq!(res.node.pos.pos, 8216); assert_eq!(res.rix, 17); let rec = &res.node.records[res.rix]; assert_eq!(rec.ts1, 970351499684884156); assert_eq!(rec.ts2, 970417919634086480); + info!("leaf match id {}", rec.child_or_id); + let pos = FilePos { pos: rec.child_or_id }; + let datablock = read_index_datablockref(&mut index_file, pos).await?; + info!("datablock {:?}", datablock); + let data_path = index_path.parent().unwrap().join(datablock.fname); + info!("try to open {:?}", data_path); + let mut data_file = open_read(data_path).await?; + let data_pos = FilePos { pos: datablock.data }; + read_data_1(&mut data_file, data_pos).await?; + Ok(()) + }; + Ok(taskrun::run(fut).unwrap()) + } + + #[test] + fn search_record_data() -> Result<(), Error> { + let fut = async { + let index_path: PathBuf = CHN_0_MASTER_INDEX.into(); + let mut index_file = open_read(index_path.clone()).await?; + let channel_name = "X05DA-FE-WI1:TC1"; + const T0: u64 = 1002000000 * SEC; + let beg = Nanos { ns: T0 }; + let res = read_channel(&mut index_file, channel_name).await?; + let cib = res.unwrap(); + let (res, _stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg).await?; + assert_eq!(res.is_some(), true); + let res = res.unwrap(); + assert_eq!(res.node.is_leaf, true); + assert_eq!(res.node.pos.pos, 1861178); + assert_eq!(res.rix, 41); + let rec = &res.node.records[res.rix]; + assert_eq!(rec.ts1, 1001993759871202919); + assert_eq!(rec.ts2, 1002009299596362122); + info!("leaf match id {}", rec.child_or_id); + let pos = FilePos { pos: rec.child_or_id }; + let datablock = read_index_datablockref(&mut index_file, pos).await?; + info!("datablock {:?}", datablock); + let data_path = index_path.parent().unwrap().join(datablock.fname); + info!("try to open {:?}", data_path); + let mut data_file = open_read(data_path).await?; + let data_pos = FilePos { pos: datablock.data }; + read_data_1(&mut data_file, data_pos).await?; Ok(()) }; Ok(taskrun::run(fut).unwrap()) @@ -610,6 +750,7 @@ mod test { let (res, stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg).await?; assert_eq!(res.is_some(), true); let res = res.unwrap(); + assert_eq!(res.node.is_leaf, true); assert_eq!(res.node.pos.pos, 8216); assert_eq!(res.rix, 0); Ok(())