Eventstream

This commit is contained in:
Dominik Werder
2021-11-02 07:31:13 +01:00
parent 15c6e1f6eb
commit f48e4b8ea4
9 changed files with 617 additions and 232 deletions

View File

@@ -1,10 +1,13 @@
pub mod backreadbuf;
pub mod blockstream;
pub mod bufminread;
pub mod datablock;
pub mod datablockstream;
pub mod diskio;
pub mod indexfiles;
pub mod indextree;
pub mod pipe;
pub mod ringbuf;
use self::indexfiles::list_index_files;
use self::indextree::channel_list;
@@ -196,80 +199,6 @@ pub fn name_hash(s: &str, ht_len: u32) -> u32 {
h
}
pub struct RingBuf {
buf: Vec<u8>,
wp: usize,
rp: usize,
}
impl RingBuf {
pub fn new() -> Self {
Self {
buf: vec![0; 1024 * 8],
wp: 0,
rp: 0,
}
}
pub fn reset(&mut self) {
self.rp = 0;
self.wp = 0;
}
pub fn len(&self) -> usize {
self.wp - self.rp
}
pub fn adv(&mut self, n: usize) {
self.rp += n;
}
pub fn data(&self) -> &[u8] {
&self.buf[self.rp..self.wp]
}
pub async fn fill(&mut self, file: &mut File, stats: &StatsChannel) -> Result<usize, Error> {
if self.rp == self.wp {
if self.rp != 0 {
self.wp = 0;
self.rp = 0;
}
} else {
unsafe {
std::ptr::copy::<u8>(&self.buf[self.rp], &mut self.buf[0], self.len());
self.wp -= self.rp;
self.rp = 0;
}
}
let n = read(file, &mut self.buf[self.wp..], stats).await?;
self.wp += n;
return Ok(n);
}
pub async fn fill_if_low(&mut self, file: &mut File, stats: &StatsChannel) -> Result<usize, Error> {
let len = self.len();
let cap = self.buf.len();
while self.len() < cap / 6 {
let n = self.fill(file, stats).await?;
if n == 0 {
break;
}
}
return Ok(self.len() - len);
}
pub async fn fill_min(&mut self, file: &mut File, min: usize, stats: &StatsChannel) -> Result<usize, Error> {
let len = self.len();
while self.len() < min {
let n = self.fill(file, stats).await?;
if n == 0 {
return Err(Error::with_msg_no_trace(format!("fill_min can not read min {}", min)));
}
}
return Ok(self.len() - len);
}
}
fn format_hex_block(buf: &[u8], max: usize) -> String {
use std::fmt::Write;
const COLS: usize = 16;

View File

@@ -0,0 +1,118 @@
use crate::archeng::{read, seek, StatsChannel};
use err::Error;
use netpod::log::*;
use std::{borrow::BorrowMut, io::SeekFrom};
use tokio::fs::File;
pub struct BackReadBuf<F> {
file: F,
buf: Vec<u8>,
abs: u64,
wp: usize,
rp: usize,
stats: StatsChannel,
seek_request: u64,
seek_done: u64,
read_done: u64,
}
impl<F> BackReadBuf<F>
where
F: BorrowMut<File>,
{
pub async fn new(file: F, pos: u64, stats: StatsChannel) -> Result<Self, Error> {
let mut ret = Self {
file,
buf: vec![0; 1024 * 8],
abs: pos,
wp: 0,
rp: 0,
stats,
seek_request: 0,
seek_done: 0,
read_done: 0,
};
ret.seek(pos).await?;
Ok(ret)
}
pub fn into_file(self) -> F {
//self.file
err::todoval()
}
pub fn len(&self) -> usize {
self.wp - self.rp
}
pub fn adv(&mut self, n: usize) {
self.rp += n;
}
pub fn data(&self) -> &[u8] {
&self.buf[self.rp..self.wp]
}
async fn fill(&mut self) -> Result<usize, Error> {
if self.rp != 0 && self.rp == self.wp {
self.wp = 0;
self.rp = 0;
} else {
unsafe {
std::ptr::copy::<u8>(&self.buf[self.rp], &mut self.buf[0], self.len());
self.wp -= self.rp;
self.rp = 0;
}
}
let n = read(&mut self.file.borrow_mut(), &mut self.buf[self.wp..], &self.stats).await?;
//debug!("I/O fill n {}", n);
self.wp += n;
self.read_done += 1;
Ok(n)
}
pub async fn fill_min(&mut self, min: usize) -> Result<usize, Error> {
let len = self.len();
while self.len() < min {
let n = self.fill().await?;
if n == 0 {
return Err(Error::with_msg_no_trace(format!("fill_min can not read min {}", min)));
}
}
Ok(self.len() - len)
}
pub async fn seek(&mut self, pos: u64) -> Result<u64, Error> {
let dp = pos as i64 - self.abs as i64 - self.rp as i64;
if pos >= self.abs && pos < self.abs + self.buf.len() as u64 - 64 {
self.rp = (pos - self.abs) as usize;
self.seek_request += 1;
Ok(pos)
} else {
//debug!("I/O seek dp {}", dp);
let s0 = pos.min(1024 * 2 - 256);
self.abs = pos - s0;
self.rp = 0;
self.wp = 0;
let ret = seek(self.file.borrow_mut(), SeekFrom::Start(self.abs), &self.stats)
.await
.map_err(|e| Error::from(e))?;
self.fill_min(s0 as usize).await?;
self.rp = s0 as usize;
self.seek_request += 1;
self.seek_done += 1;
Ok(ret)
}
}
}
impl<F> Drop for BackReadBuf<F> {
fn drop(&mut self) {
info!(
"BackReadBuf Drop {} {}% {}",
self.seek_request,
self.seek_done * 100 / self.seek_request,
self.read_done
);
}
}

View File

@@ -1,9 +1,14 @@
use crate::archeng::datablock::{read_data_1, read_datafile_header};
use crate::archeng::backreadbuf::BackReadBuf;
use crate::archeng::datablock::{read_data2, read_data_1, read_datafile_header, read_datafile_header2};
use crate::archeng::indexfiles::{database_connect, unfold_stream, UnfoldExec};
use crate::archeng::indextree::{read_datablockref, IndexFileBasics, RecordIter, RecordTarget};
use crate::archeng::indextree::{
read_datablockref, read_datablockref2, DataheaderPos, HeaderVersion, IndexFileBasics, RecordIter, RecordTarget,
};
use crate::archeng::ringbuf::RingBuf;
use crate::archeng::{open_read, seek, StatsChannel};
use err::Error;
use futures_core::{Future, Stream};
use items::WithLen;
#[allow(unused)]
use netpod::log::*;
use netpod::{Channel, ChannelArchiver, FilePos, NanoRange};
@@ -14,8 +19,6 @@ use std::path::PathBuf;
use std::pin::Pin;
use tokio::fs::File;
use super::indextree::HeaderVersion;
enum Steps {
Start,
SelectIndexFile,
@@ -30,12 +33,15 @@ struct DataBlocks {
range: NanoRange,
steps: Steps,
paths: VecDeque<String>,
file1: Option<File>,
file2: Option<File>,
file1: Option<BackReadBuf<File>>,
file2: Option<RingBuf<File>>,
last_dp: u64,
last_dp2: u64,
last_f2: String,
last_dfhpos: DataheaderPos,
dfnotfound: BTreeMap<String, bool>,
data_bytes_read: u64,
same_dfh_count: u64,
}
impl DataBlocks {
@@ -51,7 +57,10 @@ impl DataBlocks {
last_dp: 0,
last_dp2: 0,
last_f2: String::new(),
last_dfhpos: DataheaderPos(u64::MAX),
dfnotfound: BTreeMap::new(),
data_bytes_read: 0,
same_dfh_count: 0,
}
}
@@ -77,14 +86,16 @@ impl DataBlocks {
// For simplicity, simply read all storage classes linearly.
if let Some(path) = self.paths.pop_front() {
// TODO
let basics = IndexFileBasics::from_path(&path, stats).await?;
let mut file = open_read(path.clone().into(), stats).await?;
let basics = IndexFileBasics::from_file(&path, &mut file, stats).await?;
let mut tree = basics
.rtree_for_channel(self.channel.name(), stats)
.await?
.ok_or_else(|| Error::with_msg_no_trace("channel not in index files"))?;
if let Some(iter) = tree.iter_range(self.range.clone(), stats).await? {
debug!("SetupNextPath {:?}", path);
self.steps = ReadBlocks(iter, basics.hver().duplicate(), path.clone().into());
self.file1 = Some(open_read(path.into(), stats).await?);
self.file1 = Some(BackReadBuf::new(file, 0, stats.clone()).await?);
} else {
self.steps = SetupNextPath;
};
@@ -101,43 +112,68 @@ impl DataBlocks {
// TODO the iterator should actually return Dataref. We never expect child nodes here.
if let RecordTarget::Dataref(dp) = rec.target {
let f1 = self.file1.as_mut().unwrap();
//seek(f1, SeekFrom::Start(dp.0), stats).await?;
// Read the dataheader...
// TODO the function should take a DatarefPos or?
// TODO the seek is hidden in the function which makes possible optimization not accessible.
let dref = read_datablockref(f1, FilePos { pos: dp.0 }, hver.as_ref(), stats).await?;
let dref = read_datablockref2(f1, dp.clone(), hver.as_ref()).await?;
// TODO Remember the index path, need it here for relative path.
// TODO open datafile, relative path to index path.
// TODO keep open when path does not change.
let acc;
let num_samples;
if let Some(_) = self.dfnotfound.get(dref.file_name()) {
num_samples = 0;
acc = 1;
} else {
if dref.file_name() == self.last_f2 {
acc = 2;
} else {
let dpath = indexpath.parent().unwrap().join(dref.file_name());
match open_read(dpath, stats).await {
Ok(f2) => {
acc = 4;
self.file2 = Some(f2);
self.last_f2 = dref.file_name().into();
}
Err(_) => {
acc = 3;
self.file2 = None;
}
}
};
if let Some(f2) = self.file2.as_mut() {
let dfheader = read_datafile_header(f2, dref.data_header_pos(), stats).await?;
num_samples = dfheader.num_samples;
} else {
self.dfnotfound.insert(dref.file_name().into(), true);
if true {
if let Some(_) = self.dfnotfound.get(dref.file_name()) {
num_samples = 0;
};
acc = 1;
} else {
if dref.file_name() == self.last_f2 {
acc = 2;
} else {
let dpath = indexpath.parent().unwrap().join(dref.file_name());
match open_read(dpath, stats).await {
Ok(f2) => {
acc = 4;
self.file2 = Some(
RingBuf::new(f2, dref.data_header_pos().0, StatsChannel::dummy())
.await?,
);
self.last_f2 = dref.file_name().into();
}
Err(_) => {
acc = 3;
self.file2 = None;
}
}
};
if let Some(f2) = self.file2.as_mut() {
if dref.file_name() == self.last_f2 && dref.data_header_pos() == self.last_dfhpos {
num_samples = 0;
} else {
self.last_dfhpos = dref.data_header_pos();
let rp1 = f2.rp_abs();
let dfheader = read_datafile_header2(f2, dref.data_header_pos()).await?;
let data = read_data2(f2, &dfheader, self.range.clone(), false).await?;
let rp2 = f2.rp_abs();
self.data_bytes_read += rp2 - rp1;
num_samples = dfheader.num_samples;
if data.len() != num_samples as usize {
if (data.len() as i64 - num_samples as i64).abs() < 4 {
// TODO get always one event less than num_samples tells us.
//warn!("small deviation {} vs {}", data.len(), num_samples);
} else {
return Err(Error::with_msg_no_trace(format!(
"event count mismatch {} vs {}",
data.len(),
num_samples
)));
}
}
}
} else {
self.dfnotfound.insert(dref.file_name().into(), true);
num_samples = 0;
};
}
} else {
acc = 6;
num_samples = 0;
}
let item = serde_json::to_value((
dp.0,
@@ -156,6 +192,10 @@ impl DataBlocks {
panic!();
}
} else {
info!(
"data_bytes_read: {} same_dfh_count: {}",
self.data_bytes_read, self.same_dfh_count
);
self.steps = SetupNextPath;
JsVal::String(format!("NOMORE"))
};

View File

@@ -0,0 +1 @@

View File

@@ -1,6 +1,5 @@
use crate::archeng::{
read_exact, read_string, readf64, readu16, readu32, seek, RingBuf, StatsChannel, EPICS_EPOCH_OFFSET,
};
use crate::archeng::ringbuf::RingBuf;
use crate::archeng::{read_exact, read_string, readf64, readu16, readu32, seek, StatsChannel, EPICS_EPOCH_OFFSET};
use crate::eventsitem::EventsItem;
use crate::plainevents::{PlainEvents, ScalarPlainEvents};
use err::Error;
@@ -76,14 +75,14 @@ pub struct DatafileHeader {
const DATA_HEADER_LEN_ON_DISK: usize = 72 + 40 + 40;
// TODO retire this version (better version reads from buffer)
pub async fn read_datafile_header(
file: &mut File,
pos: DataheaderPos,
stats: &StatsChannel,
) -> Result<DatafileHeader, Error> {
seek(file, SeekFrom::Start(pos.0), stats).await?;
let mut rb = RingBuf::new();
rb.fill_min(file, DATA_HEADER_LEN_ON_DISK, stats).await?;
let mut rb = RingBuf::new(file, pos.0, stats.clone()).await?;
rb.fill_min(DATA_HEADER_LEN_ON_DISK).await?;
let buf = rb.data();
let dir_offset = readu32(buf, 0);
let next_offset = readu32(buf, 4);
@@ -142,6 +141,136 @@ pub async fn read_datafile_header(
Ok(ret)
}
pub async fn read_datafile_header2(rb: &mut RingBuf<File>, pos: DataheaderPos) -> Result<DatafileHeader, Error> {
// TODO avoid the extra seek: make sure that RingBuf catches this. Profile..
rb.seek(pos.0).await?;
rb.fill_min(DATA_HEADER_LEN_ON_DISK).await?;
let buf = rb.data();
let dir_offset = readu32(buf, 0);
let next_offset = readu32(buf, 4);
let prev_offset = readu32(buf, 8);
let curr_offset = readu32(buf, 12);
let num_samples = readu32(buf, 16);
let ctrl_info_offset = readu32(buf, 20);
let buf_size = readu32(buf, 24);
let buf_free = readu32(buf, 28);
let dbr_type = DbrType::from_u16(readu16(buf, 32))?;
let dbr_count = readu16(buf, 34);
// 4 bytes padding.
let period = readf64(buf, 40);
let ts1a = readu32(buf, 48);
let ts1b = readu32(buf, 52);
let ts2a = readu32(buf, 56);
let ts2b = readu32(buf, 60);
let ts3a = readu32(buf, 64);
let ts3b = readu32(buf, 68);
let ts_beg = if ts1a != 0 || ts1b != 0 {
ts1a as u64 * SEC + ts1b as u64 + EPICS_EPOCH_OFFSET
} else {
0
};
let ts_end = if ts3a != 0 || ts3b != 0 {
ts3a as u64 * SEC + ts3b as u64 + EPICS_EPOCH_OFFSET
} else {
0
};
let ts_next_file = if ts2a != 0 || ts2b != 0 {
ts2a as u64 * SEC + ts2b as u64 + EPICS_EPOCH_OFFSET
} else {
0
};
let fname_prev = read_string(&buf[72..112])?;
let fname_next = read_string(&buf[112..152])?;
rb.adv(DATA_HEADER_LEN_ON_DISK);
let ret = DatafileHeader {
pos,
dir_offset,
next_offset,
prev_offset,
curr_offset,
num_samples,
ctrl_info_offset,
buf_size,
buf_free,
dbr_type,
dbr_count: dbr_count as usize,
period,
ts_beg: Nanos { ns: ts_beg },
ts_end: Nanos { ns: ts_end },
ts_next_file: Nanos { ns: ts_next_file },
fname_next,
fname_prev,
};
Ok(ret)
}
pub async fn read_data2(
rb: &mut RingBuf<File>,
datafile_header: &DatafileHeader,
range: NanoRange,
_expand: bool,
) -> Result<EventsItem, Error> {
// TODO handle expand mode
//let dhpos = datafile_header.pos.0 + DATA_HEADER_LEN_ON_DISK as u64;
//seek(file, SeekFrom::Start(dhpos), stats).await?;
let res = match &datafile_header.dbr_type {
DbrType::DbrTimeDouble => {
if datafile_header.dbr_count == 1 {
trace!("~~~~~~~~~~~~~~~~~~~~~ read scalar DbrTimeDouble");
let mut evs = EventValues {
tss: vec![],
values: vec![],
};
let n1 = datafile_header.num_samples as usize;
//let n2 = datafile_header.dbr_type.byte_len();
let n2 = 2 + 2 + 4 + 4 + (4) + 8;
let n3 = n1 * n2;
rb.fill_min(n3).await?;
//let mut buf = vec![0; n3];
//read_exact(file, &mut buf, stats).await?;
let buf = rb.data();
let mut p1 = 0;
let mut ntot = 0;
while p1 < n3 - n2 {
let _status = u16::from_be_bytes(buf[p1..p1 + 2].try_into().unwrap());
p1 += 2;
let _severity = u16::from_be_bytes(buf[p1..p1 + 2].try_into().unwrap());
p1 += 2;
let ts1a = u32::from_be_bytes(buf[p1..p1 + 4].try_into().unwrap());
p1 += 4;
let ts1b = u32::from_be_bytes(buf[p1..p1 + 4].try_into().unwrap());
p1 += 4;
let ts1 = ts1a as u64 * SEC + ts1b as u64 + EPICS_EPOCH_OFFSET;
p1 += 4;
let value = f64::from_be_bytes(buf[p1..p1 + 8].try_into().unwrap());
p1 += 8;
ntot += 1;
if ts1 >= range.beg && ts1 < range.end {
evs.tss.push(ts1);
evs.values.push(value);
}
}
rb.adv(n3);
info!("parsed block with {} / {} events", ntot, evs.tss.len());
let evs = ScalarPlainEvents::Double(evs);
let plain = PlainEvents::Scalar(evs);
let item = EventsItem::Plain(plain);
item
} else {
let msg = format!("dbr_count {:?} not yet supported", datafile_header.dbr_count);
error!("{}", msg);
return Err(Error::with_msg_no_trace(msg));
}
}
_ => {
let msg = format!("Type {:?} not yet supported", datafile_header.dbr_type);
error!("{}", msg);
return Err(Error::with_msg_no_trace(msg));
}
};
Ok(res)
}
pub async fn read_data_1(
file: &mut File,
datafile_header: &DatafileHeader,

View File

@@ -333,8 +333,9 @@ impl ScanChannels {
.await?;
if rows.len() == 1 {
let indexfile_rid: i64 = rows[0].try_get(0)?;
let mut basics = super::indextree::IndexFileBasics::from_path(path, stats).await?;
let entries = basics.all_channel_entries(stats).await?;
let mut file = open_read(path.clone().into(), stats).await?;
let mut basics = super::indextree::IndexFileBasics::from_file(path, &mut file, stats).await?;
let entries = basics.all_channel_entries(&mut file, stats).await?;
for entry in entries {
let rows = dbc
.query("select rowid from channels where name = $1", &[&entry.channel_name()])

View File

@@ -1,5 +1,6 @@
use crate::archeng::ringbuf::RingBuf;
use crate::archeng::{
format_hex_block, name_hash, open_read, readu16, readu32, readu64, seek, RingBuf, StatsChannel, EPICS_EPOCH_OFFSET,
format_hex_block, name_hash, open_read, readu16, readu32, readu64, seek, StatsChannel, EPICS_EPOCH_OFFSET,
};
use err::Error;
use netpod::{log::*, NanoRange};
@@ -11,6 +12,8 @@ use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use tokio::fs::File;
use super::backreadbuf::BackReadBuf;
pub trait HeaderVersion: Send + Sync + fmt::Debug {
fn version(&self) -> u8;
fn read_offset(&self, buf: &[u8], pos: usize) -> u64;
@@ -81,7 +84,6 @@ impl NamedHashChannelEntry {
#[derive(Debug)]
pub struct IndexFileBasics {
file: File,
path: PathBuf,
version: u8,
name_hash_anchor_beg: u64,
@@ -100,21 +102,78 @@ pub struct IndexFileBasics {
}
impl IndexFileBasics {
pub async fn from_path(path: impl Into<PathBuf>, stats: &StatsChannel) -> Result<Self, Error> {
pub async fn from_file(path: impl Into<PathBuf>, file: &mut File, stats: &StatsChannel) -> Result<Self, Error> {
let path = path.into();
let file = open_read(path.clone(), stats).await?;
read_file_basics(path, file, stats).await
}
pub fn hver(&self) -> &Box<dyn HeaderVersion> {
&self.hver
}
pub async fn all_channel_entries(
&mut self,
file: &mut File,
stats: &StatsChannel,
) -> Result<Vec<NamedHashChannelEntry>, Error> {
let mut entries = vec![];
let mut rb = RingBuf::new(file, 0, stats.clone()).await?;
for epos in &self.name_hash_entries {
if epos.named_hash_channel_entry_pos != 0 {
let mut pos = epos.named_hash_channel_entry_pos;
while pos != 0 {
rb.seek(pos).await?;
let min0 = 4 + 2 * self.hver.offset_size();
rb.fill_min(min0).await?;
let buf = rb.data();
let entry = parse_name_hash_channel_entry(buf, self.hver.as_ref())?;
info!("parsed entry {:?}", entry);
pos = entry.next;
entries.push(entry);
}
}
}
Ok(entries)
}
pub async fn rtree_for_channel(&self, channel_name: &str, stats: &StatsChannel) -> Result<Option<Rtree>, Error> {
// TODO in the common case, the caller has already a opened file and could reuse that here.
let mut index_file = open_read(self.path.clone(), stats).await?;
let chn_hash = name_hash(channel_name, self.name_hash_anchor_len as u32);
let epos = &self.name_hash_entries[chn_hash as usize];
let mut pos = epos.named_hash_channel_entry_pos;
if pos == 0 {
warn!("no hash entry for channel {}", channel_name);
}
let mut entries = vec![];
let mut rb = RingBuf::new(&mut index_file, pos, stats.clone()).await?;
while pos != 0 {
rb.seek(pos).await?;
let min0 = 4 + 2 * self.hver.offset_size();
rb.fill_min(min0).await?;
let buf = rb.data();
let e = parse_name_hash_channel_entry(buf, self.hver.as_ref())?;
let next = e.next;
entries.push(e);
pos = next;
}
drop(rb);
for e in &entries {
if e.channel_name == channel_name {
let hver = self.hver.duplicate();
let pos = RtreePos(e.id_rtree_pos);
// TODO Rtree could reuse the File here:
let tree = Rtree::new(self.path.clone(), index_file, pos, hver, stats).await?;
return Ok(Some(tree));
}
}
Ok(None)
}
}
pub async fn read_file_basics(path: PathBuf, file: File, stats: &StatsChannel) -> Result<IndexFileBasics, Error> {
let mut file = file;
let mut rb = RingBuf::new();
rb.fill_min(&mut file, 4, stats).await?;
pub async fn read_file_basics(path: PathBuf, file: &mut File, stats: &StatsChannel) -> Result<IndexFileBasics, Error> {
let mut rb = RingBuf::new(file, 0, stats.clone()).await?;
rb.fill_min(4).await?;
let buf = rb.data();
let version = String::from_utf8(buf[3..4].to_vec())?.parse()?;
let min0;
@@ -125,11 +184,10 @@ pub async fn read_file_basics(path: PathBuf, file: File, stats: &StatsChannel) -
} else {
panic!();
}
rb.fill_min(&mut file, min0, stats).await?;
rb.fill_min(min0).await?;
let buf = rb.data();
let mut ret = if version == 3 {
IndexFileBasics {
file,
path,
version,
name_hash_anchor_beg: readu64(buf, 4),
@@ -148,7 +206,6 @@ pub async fn read_file_basics(path: PathBuf, file: File, stats: &StatsChannel) -
}
} else if version == 2 {
IndexFileBasics {
file,
path,
version,
name_hash_anchor_beg: readu32(buf, 4) as u64,
@@ -178,7 +235,7 @@ pub async fn read_file_basics(path: PathBuf, file: File, stats: &StatsChannel) -
{
let hver = &ret.hver;
for _ in 0..ret.name_hash_anchor_len {
rb.fill_min(&mut ret.file, hver.offset_size(), stats).await?;
rb.fill_min(hver.offset_size()).await?;
let buf = rb.data();
let pos = hver.read_offset(buf, 0);
rb.adv(hver.offset_size());
@@ -191,63 +248,6 @@ pub async fn read_file_basics(path: PathBuf, file: File, stats: &StatsChannel) -
Ok(ret)
}
impl IndexFileBasics {
pub async fn all_channel_entries(&mut self, stats: &StatsChannel) -> Result<Vec<NamedHashChannelEntry>, Error> {
let mut entries = vec![];
let mut rb = RingBuf::new();
for epos in &self.name_hash_entries {
if epos.named_hash_channel_entry_pos != 0 {
let mut pos = epos.named_hash_channel_entry_pos;
while pos != 0 {
rb.reset();
seek(&mut self.file, SeekFrom::Start(pos), stats).await?;
let min0 = 4 + 2 * self.hver.offset_size();
rb.fill_min(&mut self.file, min0, stats).await?;
let buf = rb.data();
let entry = parse_name_hash_channel_entry(buf, self.hver.as_ref())?;
info!("parsed entry {:?}", entry);
pos = entry.next;
entries.push(entry);
}
}
}
Ok(entries)
}
pub async fn rtree_for_channel(&self, channel_name: &str, stats: &StatsChannel) -> Result<Option<Rtree>, Error> {
let mut index_file = open_read(self.path.clone(), stats).await?;
let chn_hash = name_hash(channel_name, self.name_hash_anchor_len as u32);
let epos = &self.name_hash_entries[chn_hash as usize];
let mut pos = epos.named_hash_channel_entry_pos;
if pos == 0 {
warn!("no hash entry for channel {}", channel_name);
}
let mut entries = vec![];
let mut rb = RingBuf::new();
while pos != 0 {
rb.reset();
seek(&mut index_file, SeekFrom::Start(pos), stats).await?;
let min0 = 4 + 2 * self.hver.offset_size();
rb.fill_min(&mut index_file, min0, stats).await?;
let buf = rb.data();
let e = parse_name_hash_channel_entry(buf, self.hver.as_ref())?;
let next = e.next;
entries.push(e);
pos = next;
}
for e in &entries {
if e.channel_name == channel_name {
let hver = self.hver.duplicate();
let pos = RtreePos(e.id_rtree_pos);
// TODO Rtree could reuse the File here:
let tree = Rtree::new(self.path.clone(), index_file, pos, hver, stats).await?;
return Ok(Some(tree));
}
}
Ok(None)
}
}
#[derive(Debug)]
pub struct RTreeNodeRecord {
pub ts1: Nanos,
@@ -286,10 +286,10 @@ pub async fn read_rtree_node(
const OFF1: usize = 9;
const RLEN: usize = 24;
const NANO_MAX: u32 = 999999999;
seek(file, SeekFrom::Start(pos.into()), stats).await?;
let mut rb = RingBuf::new();
// TODO should not be used.
let mut rb = RingBuf::new(file, pos.pos, stats.clone()).await?;
// TODO must know how much data I need at least...
rb.fill_min(file, OFF1 + rtree_m * RLEN, stats).await?;
rb.fill_min(OFF1 + rtree_m * RLEN).await?;
if false {
let s = format_hex_block(rb.data(), 128);
info!("RTREE NODE:\n{}", s);
@@ -423,7 +423,7 @@ impl RtreeNodeAtRecord {
#[derive(Debug)]
pub struct Rtree {
path: PathBuf,
file: File,
rb: RingBuf<File>,
m: usize,
root: NodePos,
hver: Box<dyn HeaderVersion>,
@@ -438,10 +438,10 @@ impl Rtree {
stats: &StatsChannel,
) -> Result<Self, Error> {
let mut file = file;
let (m, root) = Self::read_entry(&mut file, pos, hver.as_ref(), stats).await?;
let (m, root) = Self::read_entry(&mut file, pos.clone(), hver.as_ref(), stats).await?;
let ret = Self {
path: path.as_ref().into(),
file,
rb: RingBuf::new(file, pos.0, stats.clone()).await?,
m,
root,
hver,
@@ -455,11 +455,10 @@ impl Rtree {
hver: &dyn HeaderVersion,
stats: &StatsChannel,
) -> Result<(usize, NodePos), Error> {
seek(file, SeekFrom::Start(pos.0), stats).await?;
let mut rb = RingBuf::new();
let mut rb = RingBuf::new(file, pos.0, stats.clone()).await?;
// TODO should be able to indicate how much I need at most before I know that I will e.g. seek or abort.
let min0 = hver.offset_size() + 4;
rb.fill_min(file, min0, stats).await?;
rb.fill_min(min0).await?;
if rb.len() < min0 {
return Err(Error::with_msg_no_trace("could not read enough"));
}
@@ -475,14 +474,13 @@ impl Rtree {
Ok(ret)
}
pub async fn read_node_at(&mut self, pos: NodePos, stats: &StatsChannel) -> Result<RtreeNode, Error> {
let file = &mut self.file;
seek(file, SeekFrom::Start(pos.0), stats).await?;
let mut rb = RingBuf::new();
pub async fn read_node_at(&mut self, pos: NodePos, _stats: &StatsChannel) -> Result<RtreeNode, Error> {
let rb = &mut self.rb;
rb.seek(pos.0).await?;
let off1 = 1 + self.hver.offset_size();
let rlen = 4 * 4 + self.hver.offset_size();
let min0 = off1 + self.m * rlen;
rb.fill_min(file, min0, stats).await?;
rb.fill_min(min0).await?;
if false {
let s = format_hex_block(rb.data(), min0);
trace!("RTREE NODE:\n{}", s);
@@ -498,9 +496,10 @@ impl Rtree {
if false {
trace!("is_leaf: {} parent: {:?}", is_leaf, parent);
}
let hver = self.hver.duplicate();
let recs = (0..self.m)
.into_iter()
.filter_map(|i| {
.filter_map(move |i| {
const NANO_MAX: u32 = 999999999;
let off2 = off1 + i * rlen;
let ts1a = readu32(buf, off2 + 0);
@@ -511,7 +510,7 @@ impl Rtree {
let ts2b = ts2b.min(NANO_MAX);
let ts1 = ts1a as u64 * SEC + ts1b as u64 + EPICS_EPOCH_OFFSET;
let ts2 = ts2a as u64 * SEC + ts2b as u64 + EPICS_EPOCH_OFFSET;
let target = self.hver.read_offset(buf, off2 + 16);
let target = hver.read_offset(buf, off2 + 16);
//trace!("NODE {} {} {} {} {}", ts1a, ts1b, ts2a, ts2b, child_or_id);
if target != 0 && ts2 != 0 {
let target = if is_leaf {
@@ -586,7 +585,7 @@ impl Rtree {
let file = open_read(self.path.clone(), stats).await?;
let ret = Self {
path: self.path.clone(),
file: file,
rb: RingBuf::new(file, 0, stats.clone()).await?,
m: self.m,
root: self.root.clone(),
hver: self.hver.duplicate(),
@@ -669,10 +668,9 @@ pub async fn read_rtree_entrypoint(
_basics: &IndexFileBasics,
stats: &StatsChannel,
) -> Result<RTreeNode, Error> {
seek(file, SeekFrom::Start(pos), stats).await?;
let mut rb = RingBuf::new();
let mut rb = RingBuf::new(file, pos, stats.clone()).await?;
// TODO remove, this is anyway still using a hardcoded offset size.
rb.fill_min(file, 8 + 4, stats).await?;
rb.fill_min(8 + 4).await?;
if rb.len() < 8 + 4 {
return Err(Error::with_msg_no_trace("could not read enough"));
}
@@ -681,7 +679,8 @@ pub async fn read_rtree_entrypoint(
let rtree_m = readu32(b, 8);
//info!("node_offset: {} rtree_m: {}", node_offset, rtree_m);
let pos = FilePos { pos: node_offset };
let node = read_rtree_node(file, pos, rtree_m as usize, stats).await?;
let mut file = rb.into_file();
let node = read_rtree_node(&mut file, pos, rtree_m as usize, stats).await?;
//info!("read_rtree_entrypoint READ ROOT NODE: {:?}", node);
Ok(node)
}
@@ -829,17 +828,17 @@ pub async fn read_channel(
stats: &StatsChannel,
) -> Result<Option<ChannelInfoBasics>, Error> {
let path = path.into();
let mut basics = read_file_basics(path.clone(), index_file, stats).await?;
let mut index_file = index_file;
let basics = read_file_basics(path.clone(), &mut index_file, stats).await?;
let chn_hash = name_hash(channel_name, basics.name_hash_anchor_len as u32);
let epos = &basics.name_hash_entries[chn_hash as usize];
let mut entries = vec![];
let mut rb = RingBuf::new();
let mut pos = epos.named_hash_channel_entry_pos;
let mut rb = RingBuf::new(index_file, pos, stats.clone()).await?;
loop {
rb.reset();
seek(&mut basics.file, SeekFrom::Start(pos), stats).await?;
rb.seek(pos).await?;
let fill_min = if basics.hver.offset_size() == 8 { 20 } else { 12 };
rb.fill_min(&mut basics.file, fill_min, stats).await?;
rb.fill_min(fill_min).await?;
if rb.len() < fill_min {
warn!("not enough data to continue reading channel list from name hash list");
break;
@@ -930,10 +929,9 @@ pub async fn read_datablockref(
hver: &dyn HeaderVersion,
stats: &StatsChannel,
) -> Result<Dataref, Error> {
seek(file, SeekFrom::Start(pos.pos), stats).await?;
let mut rb = RingBuf::new();
let mut rb = RingBuf::new(file, pos.pos, stats.clone()).await?;
let min0 = hver.offset_size() * 2 + 2;
rb.fill_min(file, min0, stats).await?;
rb.fill_min(min0).await?;
let buf = rb.data();
let mut p = 0;
let next = hver.read_offset(buf, p);
@@ -943,7 +941,37 @@ pub async fn read_datablockref(
let len = readu16(buf, p) as usize;
p += 2;
let _ = p;
rb.fill_min(file, min0 + len, stats).await?;
rb.fill_min(min0 + len).await?;
let buf = rb.data();
let fname = String::from_utf8(buf[min0..min0 + len].to_vec())?;
let next = DatarefPos(next);
let data_header_pos = DataheaderPos(data);
let ret = Dataref {
next,
data_header_pos,
fname,
};
Ok(ret)
}
pub async fn read_datablockref2(
rb: &mut BackReadBuf<File>,
pos: DatarefPos,
hver: &dyn HeaderVersion,
) -> Result<Dataref, Error> {
rb.seek(pos.0).await?;
let min0 = hver.offset_size() * 2 + 2;
rb.fill_min(min0).await?;
let buf = rb.data();
let mut p = 0;
let next = hver.read_offset(buf, p);
p += hver.offset_size();
let data = hver.read_offset(buf, p);
p += hver.offset_size();
let len = readu16(buf, p) as usize;
p += 2;
let _ = p;
rb.fill_min(min0 + len).await?;
let buf = rb.data();
let fname = String::from_utf8(buf[min0..min0 + len].to_vec())?;
let next = DatarefPos(next);
@@ -964,12 +992,11 @@ async fn channel_list_from_index_name_hash_list(
) -> Result<Vec<NamedHashChannelEntry>, Error> {
let mut pos = pos;
let mut ret = vec![];
let mut rb = RingBuf::new();
let mut rb = RingBuf::new(file, pos.pos, stats.clone()).await?;
loop {
rb.reset();
seek(file, SeekFrom::Start(pos.pos), stats).await?;
rb.seek(pos.pos).await?;
let fill_min = if hver.offset_size() == 8 { 20 } else { 12 };
rb.fill_min(file, fill_min, stats).await?;
rb.fill_min(fill_min).await?;
if rb.len() < fill_min {
warn!("not enough data to continue reading channel list from name hash list");
break;
@@ -989,8 +1016,8 @@ async fn channel_list_from_index_name_hash_list(
// TODO retire this function
pub async fn channel_list(index_path: PathBuf, stats: &StatsChannel) -> Result<Vec<String>, Error> {
let mut ret = vec![];
let file = open_read(index_path.clone(), stats).await?;
let mut basics = read_file_basics(index_path.clone(), file, stats).await?;
let mut file = open_read(index_path.clone(), stats).await?;
let basics = read_file_basics(index_path.clone(), &mut file, stats).await?;
let hver2 = HeaderVersion2;
let hver3 = HeaderVersion3;
let hver: &dyn HeaderVersion = if basics.version == 2 {
@@ -1008,7 +1035,7 @@ pub async fn channel_list(index_path: PathBuf, stats: &StatsChannel) -> Result<V
let pos = FilePos {
pos: name_hash_entry.named_hash_channel_entry_pos,
};
let list = channel_list_from_index_name_hash_list(&mut basics.file, pos, hver, stats).await?;
let list = channel_list_from_index_name_hash_list(&mut file, pos, hver, stats).await?;
for e in list {
ret.push(e.channel_name);
}
@@ -1040,8 +1067,8 @@ mod test {
fn read_file_basic_info() -> Result<(), Error> {
let fut = async {
let stats = &StatsChannel::dummy();
let file = open_read(CHN_0_MASTER_INDEX.into(), stats).await?;
let res = read_file_basics(CHN_0_MASTER_INDEX.into(), file, stats).await?;
let mut file = open_read(CHN_0_MASTER_INDEX.into(), stats).await?;
let res = read_file_basics(CHN_0_MASTER_INDEX.into(), &mut file, stats).await?;
assert_eq!(res.version, 3);
assert_eq!(res.name_hash_anchor_beg, 88);
assert_eq!(res.name_hash_anchor_len, 1009);
@@ -1062,7 +1089,8 @@ mod test {
let fut = async {
let stats = &StatsChannel::dummy();
let channel_name = "X05DA-FE-WI1:TC1";
let basics = IndexFileBasics::from_path(CHN_0_MASTER_INDEX, stats).await?;
let mut file = open_read(CHN_0_MASTER_INDEX.into(), stats).await?;
let basics = IndexFileBasics::from_file(CHN_0_MASTER_INDEX, &mut file, stats).await?;
let tree = basics.rtree_for_channel(channel_name, stats).await?;
let tree = tree.ok_or_else(|| Error::with_msg("no tree found for channel"))?;
assert_eq!(tree.m, 50);
@@ -1078,7 +1106,8 @@ mod test {
let stats = &StatsChannel::dummy();
let channel_name = "X05DA-FE-WI1:TC1";
let range = NanoRange { beg: 0, end: u64::MAX };
let basics = IndexFileBasics::from_path(CHN_0_MASTER_INDEX, stats).await?;
let mut file = open_read(CHN_0_MASTER_INDEX.into(), stats).await?;
let basics = IndexFileBasics::from_file(CHN_0_MASTER_INDEX, &mut file, stats).await?;
let mut tree = basics
.rtree_for_channel(channel_name, stats)
.await?
@@ -1114,7 +1143,8 @@ mod test {
beg: 1601503499684884156,
end: 1601569919634086480,
};
let basics = IndexFileBasics::from_path(CHN_0_MASTER_INDEX, stats).await?;
let mut file = open_read(CHN_0_MASTER_INDEX.into(), stats).await?;
let basics = IndexFileBasics::from_file(CHN_0_MASTER_INDEX, &mut file, stats).await?;
let mut tree = basics
.rtree_for_channel(channel_name, stats)
.await?

View File

@@ -0,0 +1,135 @@
use crate::archeng::{read, seek, StatsChannel};
use err::Error;
use netpod::log::*;
use std::fmt;
use std::mem::ManuallyDrop;
use std::{borrow::BorrowMut, io::SeekFrom};
use tokio::fs::File;
pub struct RingBuf<F> {
file: Option<F>,
buf: Vec<u8>,
abs: usize,
wp: usize,
rp: usize,
stats: StatsChannel,
seek_request: u64,
seek_done: u64,
read_done: u64,
}
impl<F> RingBuf<F>
where
F: BorrowMut<File>,
{
pub async fn new(file: F, pos: u64, stats: StatsChannel) -> Result<Self, Error> {
let mut ret = Self {
file: Some(file),
buf: vec![0; 1024 * 1024],
abs: usize::MAX,
wp: 0,
rp: 0,
stats,
seek_request: 0,
seek_done: 0,
read_done: 0,
};
ret.seek(pos).await?;
Ok(ret)
}
pub fn into_file(mut self) -> F {
self.file.take().unwrap()
}
pub fn len(&self) -> usize {
self.wp - self.rp
}
pub fn adv(&mut self, n: usize) {
self.rp += n;
}
pub fn data(&self) -> &[u8] {
&self.buf[self.rp..self.wp]
}
async fn fill(&mut self) -> Result<usize, Error> {
if self.rp == self.wp {
if self.rp != 0 {
self.wp = 0;
self.rp = 0;
}
} else {
unsafe {
std::ptr::copy::<u8>(&self.buf[self.rp], &mut self.buf[0], self.len());
self.wp -= self.rp;
self.rp = 0;
}
}
let n = read(
self.file.as_mut().unwrap().borrow_mut(),
&mut self.buf[self.wp..],
&self.stats,
)
.await?;
self.wp += n;
self.read_done += 1;
Ok(n)
}
pub async fn fill_min(&mut self, min: usize) -> Result<usize, Error> {
let len = self.len();
while self.len() < min {
let n = self.fill().await?;
if n == 0 {
return Err(Error::with_msg_no_trace(format!("fill_min can not read min {}", min)));
}
}
Ok(self.len() - len)
}
pub async fn seek(&mut self, pos: u64) -> Result<u64, Error> {
let dp = pos as i64 - self.abs as i64 - self.rp as i64;
if dp < 0 && dp > -2048 {
debug!("small NEG seek {}", dp);
} else if dp == 0 {
debug!("zero seek");
} else if dp > 0 && dp < 2048 {
debug!("small POS seek {}", dp);
}
self.abs = pos as usize;
self.rp = 0;
self.wp = 0;
let ret = seek(
self.file.as_mut().unwrap().borrow_mut(),
SeekFrom::Start(pos),
&self.stats,
)
.await
.map_err(|e| Error::from(e))?;
self.seek_request += 1;
self.seek_done += 1;
Ok(ret)
}
pub fn rp_abs(&self) -> u64 {
self.abs as u64 + self.rp as u64
}
}
impl<F> fmt::Debug for RingBuf<F> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("RingBuf")
.field("abs", &self.abs)
.field("wp", &self.wp)
.field("rp", &self.rp)
.finish()
}
}
impl<F> Drop for RingBuf<F> {
fn drop(&mut self) {
info!("RingBuf Drop {} {}", self.seek_request, self.read_done);
}
}

View File

@@ -96,6 +96,8 @@ pub fn tracing_init() {
"archapp::archeng::datablockstream=info",
"archapp::archeng::indextree=info",
"archapp::archeng::blockstream=trace",
"archapp::archeng::ringbuf=trace",
"archapp::archeng::backreadbuf=trace",
"archapp::storagemerge=info",
"daqbuffer::test=trace",
]