Read data header in referenced datablock
This commit is contained in:
+149
-8
@@ -210,21 +210,26 @@ fn format_hex_block(buf: &[u8], max: usize) -> String {
|
|||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn readoffset(buf: &[u8], pos: usize) -> Offset {
|
||||||
|
u64::from_be_bytes(buf.as_ref()[pos..pos + OFFSET_SIZE].try_into().unwrap())
|
||||||
|
}
|
||||||
|
|
||||||
fn readu64(buf: &[u8], pos: usize) -> u64 {
|
fn readu64(buf: &[u8], pos: usize) -> u64 {
|
||||||
let pos = pos as usize;
|
|
||||||
u64::from_be_bytes(buf.as_ref()[pos..pos + 8].try_into().unwrap())
|
u64::from_be_bytes(buf.as_ref()[pos..pos + 8].try_into().unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn readu32(buf: &[u8], pos: usize) -> u32 {
|
fn readu32(buf: &[u8], pos: usize) -> u32 {
|
||||||
let pos = pos as usize;
|
|
||||||
u32::from_be_bytes(buf.as_ref()[pos..pos + 4].try_into().unwrap())
|
u32::from_be_bytes(buf.as_ref()[pos..pos + 4].try_into().unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn readu16(buf: &[u8], pos: usize) -> u16 {
|
fn readu16(buf: &[u8], pos: usize) -> u16 {
|
||||||
let pos = pos as usize;
|
|
||||||
u16::from_be_bytes(buf.as_ref()[pos..pos + 2].try_into().unwrap())
|
u16::from_be_bytes(buf.as_ref()[pos..pos + 2].try_into().unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn readf64(buf: &[u8], pos: usize) -> f64 {
|
||||||
|
f64::from_be_bytes(buf.as_ref()[pos..pos + 8].try_into().unwrap())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn read_file_basics(f1: &mut File) -> Result<IndexFileBasics, Error> {
|
pub async fn read_file_basics(f1: &mut File) -> Result<IndexFileBasics, Error> {
|
||||||
let mut buf = vec![0; 0x58];
|
let mut buf = vec![0; 0x58];
|
||||||
read_exact(f1, &mut buf).await?;
|
read_exact(f1, &mut buf).await?;
|
||||||
@@ -516,6 +521,97 @@ pub fn datarange_stream(channel_name: &str) -> Result<Receiver<Datarange>, Error
|
|||||||
Ok(rx)
|
Ok(rx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Datablock {
|
||||||
|
next: Offset,
|
||||||
|
data: Offset,
|
||||||
|
fname: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read_index_datablockref(file: &mut File, pos: FilePos) -> Result<Datablock, Error> {
|
||||||
|
seek(file, SeekFrom::Start(pos.into())).await?;
|
||||||
|
let mut rb = RingBuf::new();
|
||||||
|
rb.fill_min(file, 18).await?;
|
||||||
|
let buf = rb.data();
|
||||||
|
let next = readoffset(buf, 0);
|
||||||
|
let data = readoffset(buf, 8);
|
||||||
|
let len = readu16(buf, 16) as usize;
|
||||||
|
rb.fill_min(file, 18 + len).await?;
|
||||||
|
let buf = rb.data();
|
||||||
|
let fname = String::from_utf8(buf[18..18 + len].to_vec())?;
|
||||||
|
let ret = Datablock { next, data, fname };
|
||||||
|
Ok(ret)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct DatafileHeader {
|
||||||
|
dir_offset: u32,
|
||||||
|
next_offset: u32,
|
||||||
|
prev_offset: u32,
|
||||||
|
curr_offset: u32,
|
||||||
|
num_samples: u32,
|
||||||
|
ctrl_info_offset: u32,
|
||||||
|
buf_size: u32,
|
||||||
|
buf_free: u32,
|
||||||
|
dbr_type: u16,
|
||||||
|
dbr_count: u16,
|
||||||
|
period: f64,
|
||||||
|
ts1: u64,
|
||||||
|
ts2: u64,
|
||||||
|
ts3: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read_datafile_header(file: &mut File, pos: FilePos) -> Result<DatafileHeader, Error> {
|
||||||
|
seek(file, SeekFrom::Start(pos.into())).await?;
|
||||||
|
let mut rb = RingBuf::new();
|
||||||
|
rb.fill_min(file, 88).await?;
|
||||||
|
let buf = rb.data();
|
||||||
|
let dir_offset = readu32(buf, 0);
|
||||||
|
let next_offset = readu32(buf, 4);
|
||||||
|
let prev_offset = readu32(buf, 8);
|
||||||
|
let curr_offset = readu32(buf, 12);
|
||||||
|
let num_samples = readu32(buf, 16);
|
||||||
|
let ctrl_info_offset = readu32(buf, 20);
|
||||||
|
let buf_size = readu32(buf, 24);
|
||||||
|
let buf_free = readu32(buf, 28);
|
||||||
|
let dbr_type = readu16(buf, 32);
|
||||||
|
let dbr_count = readu16(buf, 34);
|
||||||
|
let _unused = readu32(buf, 36);
|
||||||
|
let period = readf64(buf, 40);
|
||||||
|
let ts1a = readu32(buf, 48);
|
||||||
|
let ts1b = readu32(buf, 52);
|
||||||
|
let ts2a = readu32(buf, 56);
|
||||||
|
let ts2b = readu32(buf, 60);
|
||||||
|
let ts3a = readu32(buf, 64);
|
||||||
|
let ts3b = readu32(buf, 68);
|
||||||
|
let ts1 = ts1a as u64 * SEC + ts1b as u64;
|
||||||
|
let ts2 = ts2a as u64 * SEC + ts2b as u64;
|
||||||
|
let ts3 = ts3a as u64 * SEC + ts3b as u64;
|
||||||
|
let ret = DatafileHeader {
|
||||||
|
dir_offset,
|
||||||
|
next_offset,
|
||||||
|
prev_offset,
|
||||||
|
curr_offset,
|
||||||
|
num_samples,
|
||||||
|
ctrl_info_offset,
|
||||||
|
buf_size,
|
||||||
|
buf_free,
|
||||||
|
dbr_type,
|
||||||
|
dbr_count,
|
||||||
|
period,
|
||||||
|
ts1,
|
||||||
|
ts2,
|
||||||
|
ts3,
|
||||||
|
};
|
||||||
|
Ok(ret)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read_data_1(file: &mut File, pos: FilePos) -> Result<(), Error> {
|
||||||
|
let datafile_header = read_datafile_header(file, pos).await?;
|
||||||
|
info!("datafile_header {:?}", datafile_header);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
// TODO move RangeFilter to a different crate (items?)
|
// TODO move RangeFilter to a different crate (items?)
|
||||||
@@ -524,12 +620,12 @@ mod test {
|
|||||||
//use disk::rangefilter::RangeFilter;
|
//use disk::rangefilter::RangeFilter;
|
||||||
//use disk::{eventblobs::EventChunkerMultifile, eventchunker::EventChunkerConf};
|
//use disk::{eventblobs::EventChunkerMultifile, eventchunker::EventChunkerConf};
|
||||||
|
|
||||||
use crate::archeng::{open_read, read_channel, read_file_basics};
|
use crate::archeng::{open_read, read_channel, read_data_1, read_file_basics, read_index_datablockref};
|
||||||
use err::Error;
|
use err::Error;
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
use items::{RangeCompletableItem, StreamItem};
|
use items::{RangeCompletableItem, StreamItem};
|
||||||
use netpod::log::*;
|
use netpod::timeunits::{DAY, MS, SEC};
|
||||||
use netpod::timeunits::{DAY, MS};
|
use netpod::{log::*, FilePos};
|
||||||
use netpod::{ByteSize, ChannelConfig, FileIoBufferSize, Nanos};
|
use netpod::{ByteSize, ChannelConfig, FileIoBufferSize, Nanos};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
@@ -578,20 +674,64 @@ mod test {
|
|||||||
RTreeNodeRecord { ts1: 970417979635219603, ts2: 970429859806669835, child_or_id: 185015 },
|
RTreeNodeRecord { ts1: 970417979635219603, ts2: 970429859806669835, child_or_id: 185015 },
|
||||||
*/
|
*/
|
||||||
let fut = async {
|
let fut = async {
|
||||||
let mut index_file = open_read(CHN_0_MASTER_INDEX.into()).await?;
|
let index_path: PathBuf = CHN_0_MASTER_INDEX.into();
|
||||||
|
let mut index_file = open_read(index_path.clone()).await?;
|
||||||
let channel_name = "X05DA-FE-WI1:TC1";
|
let channel_name = "X05DA-FE-WI1:TC1";
|
||||||
const T0: u64 = 970351442331056677 + 1;
|
const T0: u64 = 970351442331056677 + 1;
|
||||||
let beg = Nanos { ns: T0 };
|
let beg = Nanos { ns: T0 };
|
||||||
let res = read_channel(&mut index_file, channel_name).await?;
|
let res = read_channel(&mut index_file, channel_name).await?;
|
||||||
let cib = res.unwrap();
|
let cib = res.unwrap();
|
||||||
let (res, stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg).await?;
|
let (res, _stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg).await?;
|
||||||
assert_eq!(res.is_some(), true);
|
assert_eq!(res.is_some(), true);
|
||||||
let res = res.unwrap();
|
let res = res.unwrap();
|
||||||
|
assert_eq!(res.node.is_leaf, true);
|
||||||
assert_eq!(res.node.pos.pos, 8216);
|
assert_eq!(res.node.pos.pos, 8216);
|
||||||
assert_eq!(res.rix, 17);
|
assert_eq!(res.rix, 17);
|
||||||
let rec = &res.node.records[res.rix];
|
let rec = &res.node.records[res.rix];
|
||||||
assert_eq!(rec.ts1, 970351499684884156);
|
assert_eq!(rec.ts1, 970351499684884156);
|
||||||
assert_eq!(rec.ts2, 970417919634086480);
|
assert_eq!(rec.ts2, 970417919634086480);
|
||||||
|
info!("leaf match id {}", rec.child_or_id);
|
||||||
|
let pos = FilePos { pos: rec.child_or_id };
|
||||||
|
let datablock = read_index_datablockref(&mut index_file, pos).await?;
|
||||||
|
info!("datablock {:?}", datablock);
|
||||||
|
let data_path = index_path.parent().unwrap().join(datablock.fname);
|
||||||
|
info!("try to open {:?}", data_path);
|
||||||
|
let mut data_file = open_read(data_path).await?;
|
||||||
|
let data_pos = FilePos { pos: datablock.data };
|
||||||
|
read_data_1(&mut data_file, data_pos).await?;
|
||||||
|
Ok(())
|
||||||
|
};
|
||||||
|
Ok(taskrun::run(fut).unwrap())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn search_record_data() -> Result<(), Error> {
|
||||||
|
let fut = async {
|
||||||
|
let index_path: PathBuf = CHN_0_MASTER_INDEX.into();
|
||||||
|
let mut index_file = open_read(index_path.clone()).await?;
|
||||||
|
let channel_name = "X05DA-FE-WI1:TC1";
|
||||||
|
const T0: u64 = 1002000000 * SEC;
|
||||||
|
let beg = Nanos { ns: T0 };
|
||||||
|
let res = read_channel(&mut index_file, channel_name).await?;
|
||||||
|
let cib = res.unwrap();
|
||||||
|
let (res, _stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg).await?;
|
||||||
|
assert_eq!(res.is_some(), true);
|
||||||
|
let res = res.unwrap();
|
||||||
|
assert_eq!(res.node.is_leaf, true);
|
||||||
|
assert_eq!(res.node.pos.pos, 1861178);
|
||||||
|
assert_eq!(res.rix, 41);
|
||||||
|
let rec = &res.node.records[res.rix];
|
||||||
|
assert_eq!(rec.ts1, 1001993759871202919);
|
||||||
|
assert_eq!(rec.ts2, 1002009299596362122);
|
||||||
|
info!("leaf match id {}", rec.child_or_id);
|
||||||
|
let pos = FilePos { pos: rec.child_or_id };
|
||||||
|
let datablock = read_index_datablockref(&mut index_file, pos).await?;
|
||||||
|
info!("datablock {:?}", datablock);
|
||||||
|
let data_path = index_path.parent().unwrap().join(datablock.fname);
|
||||||
|
info!("try to open {:?}", data_path);
|
||||||
|
let mut data_file = open_read(data_path).await?;
|
||||||
|
let data_pos = FilePos { pos: datablock.data };
|
||||||
|
read_data_1(&mut data_file, data_pos).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
};
|
};
|
||||||
Ok(taskrun::run(fut).unwrap())
|
Ok(taskrun::run(fut).unwrap())
|
||||||
@@ -610,6 +750,7 @@ mod test {
|
|||||||
let (res, stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg).await?;
|
let (res, stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg).await?;
|
||||||
assert_eq!(res.is_some(), true);
|
assert_eq!(res.is_some(), true);
|
||||||
let res = res.unwrap();
|
let res = res.unwrap();
|
||||||
|
assert_eq!(res.node.is_leaf, true);
|
||||||
assert_eq!(res.node.pos.pos, 8216);
|
assert_eq!(res.node.pos.pos, 8216);
|
||||||
assert_eq!(res.rix, 0);
|
assert_eq!(res.rix, 0);
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
Reference in New Issue
Block a user