diff --git a/archapp/src/archeng.rs b/archapp/src/archeng.rs index 72c9c45..7c526fa 100644 --- a/archapp/src/archeng.rs +++ b/archapp/src/archeng.rs @@ -1,10 +1,13 @@ +pub mod backreadbuf; pub mod blockstream; +pub mod bufminread; pub mod datablock; pub mod datablockstream; pub mod diskio; pub mod indexfiles; pub mod indextree; pub mod pipe; +pub mod ringbuf; use self::indexfiles::list_index_files; use self::indextree::channel_list; @@ -196,80 +199,6 @@ pub fn name_hash(s: &str, ht_len: u32) -> u32 { h } -pub struct RingBuf { - buf: Vec, - wp: usize, - rp: usize, -} - -impl RingBuf { - pub fn new() -> Self { - Self { - buf: vec![0; 1024 * 8], - wp: 0, - rp: 0, - } - } - - pub fn reset(&mut self) { - self.rp = 0; - self.wp = 0; - } - - pub fn len(&self) -> usize { - self.wp - self.rp - } - - pub fn adv(&mut self, n: usize) { - self.rp += n; - } - - pub fn data(&self) -> &[u8] { - &self.buf[self.rp..self.wp] - } - - pub async fn fill(&mut self, file: &mut File, stats: &StatsChannel) -> Result { - if self.rp == self.wp { - if self.rp != 0 { - self.wp = 0; - self.rp = 0; - } - } else { - unsafe { - std::ptr::copy::(&self.buf[self.rp], &mut self.buf[0], self.len()); - self.wp -= self.rp; - self.rp = 0; - } - } - let n = read(file, &mut self.buf[self.wp..], stats).await?; - self.wp += n; - return Ok(n); - } - - pub async fn fill_if_low(&mut self, file: &mut File, stats: &StatsChannel) -> Result { - let len = self.len(); - let cap = self.buf.len(); - while self.len() < cap / 6 { - let n = self.fill(file, stats).await?; - if n == 0 { - break; - } - } - return Ok(self.len() - len); - } - - pub async fn fill_min(&mut self, file: &mut File, min: usize, stats: &StatsChannel) -> Result { - let len = self.len(); - while self.len() < min { - let n = self.fill(file, stats).await?; - if n == 0 { - return Err(Error::with_msg_no_trace(format!("fill_min can not read min {}", min))); - } - } - return Ok(self.len() - len); - } -} - fn format_hex_block(buf: &[u8], max: usize) -> String { use std::fmt::Write; const COLS: usize = 16; diff --git a/archapp/src/archeng/backreadbuf.rs b/archapp/src/archeng/backreadbuf.rs new file mode 100644 index 0000000..4c97709 --- /dev/null +++ b/archapp/src/archeng/backreadbuf.rs @@ -0,0 +1,118 @@ +use crate::archeng::{read, seek, StatsChannel}; +use err::Error; +use netpod::log::*; +use std::{borrow::BorrowMut, io::SeekFrom}; +use tokio::fs::File; + +pub struct BackReadBuf { + file: F, + buf: Vec, + abs: u64, + wp: usize, + rp: usize, + stats: StatsChannel, + seek_request: u64, + seek_done: u64, + read_done: u64, +} + +impl BackReadBuf +where + F: BorrowMut, +{ + pub async fn new(file: F, pos: u64, stats: StatsChannel) -> Result { + let mut ret = Self { + file, + buf: vec![0; 1024 * 8], + abs: pos, + wp: 0, + rp: 0, + stats, + seek_request: 0, + seek_done: 0, + read_done: 0, + }; + ret.seek(pos).await?; + Ok(ret) + } + + pub fn into_file(self) -> F { + //self.file + err::todoval() + } + + pub fn len(&self) -> usize { + self.wp - self.rp + } + + pub fn adv(&mut self, n: usize) { + self.rp += n; + } + + pub fn data(&self) -> &[u8] { + &self.buf[self.rp..self.wp] + } + + async fn fill(&mut self) -> Result { + if self.rp != 0 && self.rp == self.wp { + self.wp = 0; + self.rp = 0; + } else { + unsafe { + std::ptr::copy::(&self.buf[self.rp], &mut self.buf[0], self.len()); + self.wp -= self.rp; + self.rp = 0; + } + } + let n = read(&mut self.file.borrow_mut(), &mut self.buf[self.wp..], &self.stats).await?; + //debug!("I/O fill n {}", n); + self.wp += n; + self.read_done += 1; + Ok(n) + } + + pub async fn fill_min(&mut self, min: usize) -> Result { + let len = self.len(); + while self.len() < min { + let n = self.fill().await?; + if n == 0 { + return Err(Error::with_msg_no_trace(format!("fill_min can not read min {}", min))); + } + } + Ok(self.len() - len) + } + + pub async fn seek(&mut self, pos: u64) -> Result { + let dp = pos as i64 - self.abs as i64 - self.rp as i64; + if pos >= self.abs && pos < self.abs + self.buf.len() as u64 - 64 { + self.rp = (pos - self.abs) as usize; + self.seek_request += 1; + Ok(pos) + } else { + //debug!("I/O seek dp {}", dp); + let s0 = pos.min(1024 * 2 - 256); + self.abs = pos - s0; + self.rp = 0; + self.wp = 0; + let ret = seek(self.file.borrow_mut(), SeekFrom::Start(self.abs), &self.stats) + .await + .map_err(|e| Error::from(e))?; + self.fill_min(s0 as usize).await?; + self.rp = s0 as usize; + self.seek_request += 1; + self.seek_done += 1; + Ok(ret) + } + } +} + +impl Drop for BackReadBuf { + fn drop(&mut self) { + info!( + "BackReadBuf Drop {} {}% {}", + self.seek_request, + self.seek_done * 100 / self.seek_request, + self.read_done + ); + } +} diff --git a/archapp/src/archeng/blockstream.rs b/archapp/src/archeng/blockstream.rs index 6f75630..8daebc2 100644 --- a/archapp/src/archeng/blockstream.rs +++ b/archapp/src/archeng/blockstream.rs @@ -1,9 +1,14 @@ -use crate::archeng::datablock::{read_data_1, read_datafile_header}; +use crate::archeng::backreadbuf::BackReadBuf; +use crate::archeng::datablock::{read_data2, read_data_1, read_datafile_header, read_datafile_header2}; use crate::archeng::indexfiles::{database_connect, unfold_stream, UnfoldExec}; -use crate::archeng::indextree::{read_datablockref, IndexFileBasics, RecordIter, RecordTarget}; +use crate::archeng::indextree::{ + read_datablockref, read_datablockref2, DataheaderPos, HeaderVersion, IndexFileBasics, RecordIter, RecordTarget, +}; +use crate::archeng::ringbuf::RingBuf; use crate::archeng::{open_read, seek, StatsChannel}; use err::Error; use futures_core::{Future, Stream}; +use items::WithLen; #[allow(unused)] use netpod::log::*; use netpod::{Channel, ChannelArchiver, FilePos, NanoRange}; @@ -14,8 +19,6 @@ use std::path::PathBuf; use std::pin::Pin; use tokio::fs::File; -use super::indextree::HeaderVersion; - enum Steps { Start, SelectIndexFile, @@ -30,12 +33,15 @@ struct DataBlocks { range: NanoRange, steps: Steps, paths: VecDeque, - file1: Option, - file2: Option, + file1: Option>, + file2: Option>, last_dp: u64, last_dp2: u64, last_f2: String, + last_dfhpos: DataheaderPos, dfnotfound: BTreeMap, + data_bytes_read: u64, + same_dfh_count: u64, } impl DataBlocks { @@ -51,7 +57,10 @@ impl DataBlocks { last_dp: 0, last_dp2: 0, last_f2: String::new(), + last_dfhpos: DataheaderPos(u64::MAX), dfnotfound: BTreeMap::new(), + data_bytes_read: 0, + same_dfh_count: 0, } } @@ -77,14 +86,16 @@ impl DataBlocks { // For simplicity, simply read all storage classes linearly. if let Some(path) = self.paths.pop_front() { // TODO - let basics = IndexFileBasics::from_path(&path, stats).await?; + let mut file = open_read(path.clone().into(), stats).await?; + let basics = IndexFileBasics::from_file(&path, &mut file, stats).await?; let mut tree = basics .rtree_for_channel(self.channel.name(), stats) .await? .ok_or_else(|| Error::with_msg_no_trace("channel not in index files"))?; if let Some(iter) = tree.iter_range(self.range.clone(), stats).await? { + debug!("SetupNextPath {:?}", path); self.steps = ReadBlocks(iter, basics.hver().duplicate(), path.clone().into()); - self.file1 = Some(open_read(path.into(), stats).await?); + self.file1 = Some(BackReadBuf::new(file, 0, stats.clone()).await?); } else { self.steps = SetupNextPath; }; @@ -101,43 +112,68 @@ impl DataBlocks { // TODO the iterator should actually return Dataref. We never expect child nodes here. if let RecordTarget::Dataref(dp) = rec.target { let f1 = self.file1.as_mut().unwrap(); - //seek(f1, SeekFrom::Start(dp.0), stats).await?; - // Read the dataheader... - // TODO the function should take a DatarefPos or? - // TODO the seek is hidden in the function which makes possible optimization not accessible. - let dref = read_datablockref(f1, FilePos { pos: dp.0 }, hver.as_ref(), stats).await?; + let dref = read_datablockref2(f1, dp.clone(), hver.as_ref()).await?; // TODO Remember the index path, need it here for relative path. // TODO open datafile, relative path to index path. // TODO keep open when path does not change. let acc; let num_samples; - if let Some(_) = self.dfnotfound.get(dref.file_name()) { - num_samples = 0; - acc = 1; - } else { - if dref.file_name() == self.last_f2 { - acc = 2; - } else { - let dpath = indexpath.parent().unwrap().join(dref.file_name()); - match open_read(dpath, stats).await { - Ok(f2) => { - acc = 4; - self.file2 = Some(f2); - self.last_f2 = dref.file_name().into(); - } - Err(_) => { - acc = 3; - self.file2 = None; - } - } - }; - if let Some(f2) = self.file2.as_mut() { - let dfheader = read_datafile_header(f2, dref.data_header_pos(), stats).await?; - num_samples = dfheader.num_samples; - } else { - self.dfnotfound.insert(dref.file_name().into(), true); + if true { + if let Some(_) = self.dfnotfound.get(dref.file_name()) { num_samples = 0; - }; + acc = 1; + } else { + if dref.file_name() == self.last_f2 { + acc = 2; + } else { + let dpath = indexpath.parent().unwrap().join(dref.file_name()); + match open_read(dpath, stats).await { + Ok(f2) => { + acc = 4; + self.file2 = Some( + RingBuf::new(f2, dref.data_header_pos().0, StatsChannel::dummy()) + .await?, + ); + self.last_f2 = dref.file_name().into(); + } + Err(_) => { + acc = 3; + self.file2 = None; + } + } + }; + if let Some(f2) = self.file2.as_mut() { + if dref.file_name() == self.last_f2 && dref.data_header_pos() == self.last_dfhpos { + num_samples = 0; + } else { + self.last_dfhpos = dref.data_header_pos(); + let rp1 = f2.rp_abs(); + let dfheader = read_datafile_header2(f2, dref.data_header_pos()).await?; + let data = read_data2(f2, &dfheader, self.range.clone(), false).await?; + let rp2 = f2.rp_abs(); + self.data_bytes_read += rp2 - rp1; + num_samples = dfheader.num_samples; + if data.len() != num_samples as usize { + if (data.len() as i64 - num_samples as i64).abs() < 4 { + // TODO get always one event less than num_samples tells us. + //warn!("small deviation {} vs {}", data.len(), num_samples); + } else { + return Err(Error::with_msg_no_trace(format!( + "event count mismatch {} vs {}", + data.len(), + num_samples + ))); + } + } + } + } else { + self.dfnotfound.insert(dref.file_name().into(), true); + num_samples = 0; + }; + } + } else { + acc = 6; + num_samples = 0; } let item = serde_json::to_value(( dp.0, @@ -156,6 +192,10 @@ impl DataBlocks { panic!(); } } else { + info!( + "data_bytes_read: {} same_dfh_count: {}", + self.data_bytes_read, self.same_dfh_count + ); self.steps = SetupNextPath; JsVal::String(format!("NOMORE")) }; diff --git a/archapp/src/archeng/bufminread.rs b/archapp/src/archeng/bufminread.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/archapp/src/archeng/bufminread.rs @@ -0,0 +1 @@ + diff --git a/archapp/src/archeng/datablock.rs b/archapp/src/archeng/datablock.rs index 6a4b317..4804bb7 100644 --- a/archapp/src/archeng/datablock.rs +++ b/archapp/src/archeng/datablock.rs @@ -1,6 +1,5 @@ -use crate::archeng::{ - read_exact, read_string, readf64, readu16, readu32, seek, RingBuf, StatsChannel, EPICS_EPOCH_OFFSET, -}; +use crate::archeng::ringbuf::RingBuf; +use crate::archeng::{read_exact, read_string, readf64, readu16, readu32, seek, StatsChannel, EPICS_EPOCH_OFFSET}; use crate::eventsitem::EventsItem; use crate::plainevents::{PlainEvents, ScalarPlainEvents}; use err::Error; @@ -76,14 +75,14 @@ pub struct DatafileHeader { const DATA_HEADER_LEN_ON_DISK: usize = 72 + 40 + 40; +// TODO retire this version (better version reads from buffer) pub async fn read_datafile_header( file: &mut File, pos: DataheaderPos, stats: &StatsChannel, ) -> Result { - seek(file, SeekFrom::Start(pos.0), stats).await?; - let mut rb = RingBuf::new(); - rb.fill_min(file, DATA_HEADER_LEN_ON_DISK, stats).await?; + let mut rb = RingBuf::new(file, pos.0, stats.clone()).await?; + rb.fill_min(DATA_HEADER_LEN_ON_DISK).await?; let buf = rb.data(); let dir_offset = readu32(buf, 0); let next_offset = readu32(buf, 4); @@ -142,6 +141,136 @@ pub async fn read_datafile_header( Ok(ret) } +pub async fn read_datafile_header2(rb: &mut RingBuf, pos: DataheaderPos) -> Result { + // TODO avoid the extra seek: make sure that RingBuf catches this. Profile.. + rb.seek(pos.0).await?; + rb.fill_min(DATA_HEADER_LEN_ON_DISK).await?; + let buf = rb.data(); + let dir_offset = readu32(buf, 0); + let next_offset = readu32(buf, 4); + let prev_offset = readu32(buf, 8); + let curr_offset = readu32(buf, 12); + let num_samples = readu32(buf, 16); + let ctrl_info_offset = readu32(buf, 20); + let buf_size = readu32(buf, 24); + let buf_free = readu32(buf, 28); + let dbr_type = DbrType::from_u16(readu16(buf, 32))?; + let dbr_count = readu16(buf, 34); + // 4 bytes padding. + let period = readf64(buf, 40); + let ts1a = readu32(buf, 48); + let ts1b = readu32(buf, 52); + let ts2a = readu32(buf, 56); + let ts2b = readu32(buf, 60); + let ts3a = readu32(buf, 64); + let ts3b = readu32(buf, 68); + let ts_beg = if ts1a != 0 || ts1b != 0 { + ts1a as u64 * SEC + ts1b as u64 + EPICS_EPOCH_OFFSET + } else { + 0 + }; + let ts_end = if ts3a != 0 || ts3b != 0 { + ts3a as u64 * SEC + ts3b as u64 + EPICS_EPOCH_OFFSET + } else { + 0 + }; + let ts_next_file = if ts2a != 0 || ts2b != 0 { + ts2a as u64 * SEC + ts2b as u64 + EPICS_EPOCH_OFFSET + } else { + 0 + }; + let fname_prev = read_string(&buf[72..112])?; + let fname_next = read_string(&buf[112..152])?; + rb.adv(DATA_HEADER_LEN_ON_DISK); + let ret = DatafileHeader { + pos, + dir_offset, + next_offset, + prev_offset, + curr_offset, + num_samples, + ctrl_info_offset, + buf_size, + buf_free, + dbr_type, + dbr_count: dbr_count as usize, + period, + ts_beg: Nanos { ns: ts_beg }, + ts_end: Nanos { ns: ts_end }, + ts_next_file: Nanos { ns: ts_next_file }, + fname_next, + fname_prev, + }; + Ok(ret) +} + +pub async fn read_data2( + rb: &mut RingBuf, + datafile_header: &DatafileHeader, + range: NanoRange, + _expand: bool, +) -> Result { + // TODO handle expand mode + //let dhpos = datafile_header.pos.0 + DATA_HEADER_LEN_ON_DISK as u64; + //seek(file, SeekFrom::Start(dhpos), stats).await?; + let res = match &datafile_header.dbr_type { + DbrType::DbrTimeDouble => { + if datafile_header.dbr_count == 1 { + trace!("~~~~~~~~~~~~~~~~~~~~~ read scalar DbrTimeDouble"); + let mut evs = EventValues { + tss: vec![], + values: vec![], + }; + let n1 = datafile_header.num_samples as usize; + //let n2 = datafile_header.dbr_type.byte_len(); + let n2 = 2 + 2 + 4 + 4 + (4) + 8; + let n3 = n1 * n2; + rb.fill_min(n3).await?; + //let mut buf = vec![0; n3]; + //read_exact(file, &mut buf, stats).await?; + let buf = rb.data(); + let mut p1 = 0; + let mut ntot = 0; + while p1 < n3 - n2 { + let _status = u16::from_be_bytes(buf[p1..p1 + 2].try_into().unwrap()); + p1 += 2; + let _severity = u16::from_be_bytes(buf[p1..p1 + 2].try_into().unwrap()); + p1 += 2; + let ts1a = u32::from_be_bytes(buf[p1..p1 + 4].try_into().unwrap()); + p1 += 4; + let ts1b = u32::from_be_bytes(buf[p1..p1 + 4].try_into().unwrap()); + p1 += 4; + let ts1 = ts1a as u64 * SEC + ts1b as u64 + EPICS_EPOCH_OFFSET; + p1 += 4; + let value = f64::from_be_bytes(buf[p1..p1 + 8].try_into().unwrap()); + p1 += 8; + ntot += 1; + if ts1 >= range.beg && ts1 < range.end { + evs.tss.push(ts1); + evs.values.push(value); + } + } + rb.adv(n3); + info!("parsed block with {} / {} events", ntot, evs.tss.len()); + let evs = ScalarPlainEvents::Double(evs); + let plain = PlainEvents::Scalar(evs); + let item = EventsItem::Plain(plain); + item + } else { + let msg = format!("dbr_count {:?} not yet supported", datafile_header.dbr_count); + error!("{}", msg); + return Err(Error::with_msg_no_trace(msg)); + } + } + _ => { + let msg = format!("Type {:?} not yet supported", datafile_header.dbr_type); + error!("{}", msg); + return Err(Error::with_msg_no_trace(msg)); + } + }; + Ok(res) +} + pub async fn read_data_1( file: &mut File, datafile_header: &DatafileHeader, diff --git a/archapp/src/archeng/indexfiles.rs b/archapp/src/archeng/indexfiles.rs index 43be010..1fbbce5 100644 --- a/archapp/src/archeng/indexfiles.rs +++ b/archapp/src/archeng/indexfiles.rs @@ -333,8 +333,9 @@ impl ScanChannels { .await?; if rows.len() == 1 { let indexfile_rid: i64 = rows[0].try_get(0)?; - let mut basics = super::indextree::IndexFileBasics::from_path(path, stats).await?; - let entries = basics.all_channel_entries(stats).await?; + let mut file = open_read(path.clone().into(), stats).await?; + let mut basics = super::indextree::IndexFileBasics::from_file(path, &mut file, stats).await?; + let entries = basics.all_channel_entries(&mut file, stats).await?; for entry in entries { let rows = dbc .query("select rowid from channels where name = $1", &[&entry.channel_name()]) diff --git a/archapp/src/archeng/indextree.rs b/archapp/src/archeng/indextree.rs index b9868c3..86207f0 100644 --- a/archapp/src/archeng/indextree.rs +++ b/archapp/src/archeng/indextree.rs @@ -1,5 +1,6 @@ +use crate::archeng::ringbuf::RingBuf; use crate::archeng::{ - format_hex_block, name_hash, open_read, readu16, readu32, readu64, seek, RingBuf, StatsChannel, EPICS_EPOCH_OFFSET, + format_hex_block, name_hash, open_read, readu16, readu32, readu64, seek, StatsChannel, EPICS_EPOCH_OFFSET, }; use err::Error; use netpod::{log::*, NanoRange}; @@ -11,6 +12,8 @@ use std::path::{Path, PathBuf}; use std::time::{Duration, Instant}; use tokio::fs::File; +use super::backreadbuf::BackReadBuf; + pub trait HeaderVersion: Send + Sync + fmt::Debug { fn version(&self) -> u8; fn read_offset(&self, buf: &[u8], pos: usize) -> u64; @@ -81,7 +84,6 @@ impl NamedHashChannelEntry { #[derive(Debug)] pub struct IndexFileBasics { - file: File, path: PathBuf, version: u8, name_hash_anchor_beg: u64, @@ -100,21 +102,78 @@ pub struct IndexFileBasics { } impl IndexFileBasics { - pub async fn from_path(path: impl Into, stats: &StatsChannel) -> Result { + pub async fn from_file(path: impl Into, file: &mut File, stats: &StatsChannel) -> Result { let path = path.into(); - let file = open_read(path.clone(), stats).await?; read_file_basics(path, file, stats).await } pub fn hver(&self) -> &Box { &self.hver } + + pub async fn all_channel_entries( + &mut self, + file: &mut File, + stats: &StatsChannel, + ) -> Result, Error> { + let mut entries = vec![]; + let mut rb = RingBuf::new(file, 0, stats.clone()).await?; + for epos in &self.name_hash_entries { + if epos.named_hash_channel_entry_pos != 0 { + let mut pos = epos.named_hash_channel_entry_pos; + while pos != 0 { + rb.seek(pos).await?; + let min0 = 4 + 2 * self.hver.offset_size(); + rb.fill_min(min0).await?; + let buf = rb.data(); + let entry = parse_name_hash_channel_entry(buf, self.hver.as_ref())?; + info!("parsed entry {:?}", entry); + pos = entry.next; + entries.push(entry); + } + } + } + Ok(entries) + } + + pub async fn rtree_for_channel(&self, channel_name: &str, stats: &StatsChannel) -> Result, Error> { + // TODO in the common case, the caller has already a opened file and could reuse that here. + let mut index_file = open_read(self.path.clone(), stats).await?; + let chn_hash = name_hash(channel_name, self.name_hash_anchor_len as u32); + let epos = &self.name_hash_entries[chn_hash as usize]; + let mut pos = epos.named_hash_channel_entry_pos; + if pos == 0 { + warn!("no hash entry for channel {}", channel_name); + } + let mut entries = vec![]; + let mut rb = RingBuf::new(&mut index_file, pos, stats.clone()).await?; + while pos != 0 { + rb.seek(pos).await?; + let min0 = 4 + 2 * self.hver.offset_size(); + rb.fill_min(min0).await?; + let buf = rb.data(); + let e = parse_name_hash_channel_entry(buf, self.hver.as_ref())?; + let next = e.next; + entries.push(e); + pos = next; + } + drop(rb); + for e in &entries { + if e.channel_name == channel_name { + let hver = self.hver.duplicate(); + let pos = RtreePos(e.id_rtree_pos); + // TODO Rtree could reuse the File here: + let tree = Rtree::new(self.path.clone(), index_file, pos, hver, stats).await?; + return Ok(Some(tree)); + } + } + Ok(None) + } } -pub async fn read_file_basics(path: PathBuf, file: File, stats: &StatsChannel) -> Result { - let mut file = file; - let mut rb = RingBuf::new(); - rb.fill_min(&mut file, 4, stats).await?; +pub async fn read_file_basics(path: PathBuf, file: &mut File, stats: &StatsChannel) -> Result { + let mut rb = RingBuf::new(file, 0, stats.clone()).await?; + rb.fill_min(4).await?; let buf = rb.data(); let version = String::from_utf8(buf[3..4].to_vec())?.parse()?; let min0; @@ -125,11 +184,10 @@ pub async fn read_file_basics(path: PathBuf, file: File, stats: &StatsChannel) - } else { panic!(); } - rb.fill_min(&mut file, min0, stats).await?; + rb.fill_min(min0).await?; let buf = rb.data(); let mut ret = if version == 3 { IndexFileBasics { - file, path, version, name_hash_anchor_beg: readu64(buf, 4), @@ -148,7 +206,6 @@ pub async fn read_file_basics(path: PathBuf, file: File, stats: &StatsChannel) - } } else if version == 2 { IndexFileBasics { - file, path, version, name_hash_anchor_beg: readu32(buf, 4) as u64, @@ -178,7 +235,7 @@ pub async fn read_file_basics(path: PathBuf, file: File, stats: &StatsChannel) - { let hver = &ret.hver; for _ in 0..ret.name_hash_anchor_len { - rb.fill_min(&mut ret.file, hver.offset_size(), stats).await?; + rb.fill_min(hver.offset_size()).await?; let buf = rb.data(); let pos = hver.read_offset(buf, 0); rb.adv(hver.offset_size()); @@ -191,63 +248,6 @@ pub async fn read_file_basics(path: PathBuf, file: File, stats: &StatsChannel) - Ok(ret) } -impl IndexFileBasics { - pub async fn all_channel_entries(&mut self, stats: &StatsChannel) -> Result, Error> { - let mut entries = vec![]; - let mut rb = RingBuf::new(); - for epos in &self.name_hash_entries { - if epos.named_hash_channel_entry_pos != 0 { - let mut pos = epos.named_hash_channel_entry_pos; - while pos != 0 { - rb.reset(); - seek(&mut self.file, SeekFrom::Start(pos), stats).await?; - let min0 = 4 + 2 * self.hver.offset_size(); - rb.fill_min(&mut self.file, min0, stats).await?; - let buf = rb.data(); - let entry = parse_name_hash_channel_entry(buf, self.hver.as_ref())?; - info!("parsed entry {:?}", entry); - pos = entry.next; - entries.push(entry); - } - } - } - Ok(entries) - } - - pub async fn rtree_for_channel(&self, channel_name: &str, stats: &StatsChannel) -> Result, Error> { - let mut index_file = open_read(self.path.clone(), stats).await?; - let chn_hash = name_hash(channel_name, self.name_hash_anchor_len as u32); - let epos = &self.name_hash_entries[chn_hash as usize]; - let mut pos = epos.named_hash_channel_entry_pos; - if pos == 0 { - warn!("no hash entry for channel {}", channel_name); - } - let mut entries = vec![]; - let mut rb = RingBuf::new(); - while pos != 0 { - rb.reset(); - seek(&mut index_file, SeekFrom::Start(pos), stats).await?; - let min0 = 4 + 2 * self.hver.offset_size(); - rb.fill_min(&mut index_file, min0, stats).await?; - let buf = rb.data(); - let e = parse_name_hash_channel_entry(buf, self.hver.as_ref())?; - let next = e.next; - entries.push(e); - pos = next; - } - for e in &entries { - if e.channel_name == channel_name { - let hver = self.hver.duplicate(); - let pos = RtreePos(e.id_rtree_pos); - // TODO Rtree could reuse the File here: - let tree = Rtree::new(self.path.clone(), index_file, pos, hver, stats).await?; - return Ok(Some(tree)); - } - } - Ok(None) - } -} - #[derive(Debug)] pub struct RTreeNodeRecord { pub ts1: Nanos, @@ -286,10 +286,10 @@ pub async fn read_rtree_node( const OFF1: usize = 9; const RLEN: usize = 24; const NANO_MAX: u32 = 999999999; - seek(file, SeekFrom::Start(pos.into()), stats).await?; - let mut rb = RingBuf::new(); + // TODO should not be used. + let mut rb = RingBuf::new(file, pos.pos, stats.clone()).await?; // TODO must know how much data I need at least... - rb.fill_min(file, OFF1 + rtree_m * RLEN, stats).await?; + rb.fill_min(OFF1 + rtree_m * RLEN).await?; if false { let s = format_hex_block(rb.data(), 128); info!("RTREE NODE:\n{}", s); @@ -423,7 +423,7 @@ impl RtreeNodeAtRecord { #[derive(Debug)] pub struct Rtree { path: PathBuf, - file: File, + rb: RingBuf, m: usize, root: NodePos, hver: Box, @@ -438,10 +438,10 @@ impl Rtree { stats: &StatsChannel, ) -> Result { let mut file = file; - let (m, root) = Self::read_entry(&mut file, pos, hver.as_ref(), stats).await?; + let (m, root) = Self::read_entry(&mut file, pos.clone(), hver.as_ref(), stats).await?; let ret = Self { path: path.as_ref().into(), - file, + rb: RingBuf::new(file, pos.0, stats.clone()).await?, m, root, hver, @@ -455,11 +455,10 @@ impl Rtree { hver: &dyn HeaderVersion, stats: &StatsChannel, ) -> Result<(usize, NodePos), Error> { - seek(file, SeekFrom::Start(pos.0), stats).await?; - let mut rb = RingBuf::new(); + let mut rb = RingBuf::new(file, pos.0, stats.clone()).await?; // TODO should be able to indicate how much I need at most before I know that I will e.g. seek or abort. let min0 = hver.offset_size() + 4; - rb.fill_min(file, min0, stats).await?; + rb.fill_min(min0).await?; if rb.len() < min0 { return Err(Error::with_msg_no_trace("could not read enough")); } @@ -475,14 +474,13 @@ impl Rtree { Ok(ret) } - pub async fn read_node_at(&mut self, pos: NodePos, stats: &StatsChannel) -> Result { - let file = &mut self.file; - seek(file, SeekFrom::Start(pos.0), stats).await?; - let mut rb = RingBuf::new(); + pub async fn read_node_at(&mut self, pos: NodePos, _stats: &StatsChannel) -> Result { + let rb = &mut self.rb; + rb.seek(pos.0).await?; let off1 = 1 + self.hver.offset_size(); let rlen = 4 * 4 + self.hver.offset_size(); let min0 = off1 + self.m * rlen; - rb.fill_min(file, min0, stats).await?; + rb.fill_min(min0).await?; if false { let s = format_hex_block(rb.data(), min0); trace!("RTREE NODE:\n{}", s); @@ -498,9 +496,10 @@ impl Rtree { if false { trace!("is_leaf: {} parent: {:?}", is_leaf, parent); } + let hver = self.hver.duplicate(); let recs = (0..self.m) .into_iter() - .filter_map(|i| { + .filter_map(move |i| { const NANO_MAX: u32 = 999999999; let off2 = off1 + i * rlen; let ts1a = readu32(buf, off2 + 0); @@ -511,7 +510,7 @@ impl Rtree { let ts2b = ts2b.min(NANO_MAX); let ts1 = ts1a as u64 * SEC + ts1b as u64 + EPICS_EPOCH_OFFSET; let ts2 = ts2a as u64 * SEC + ts2b as u64 + EPICS_EPOCH_OFFSET; - let target = self.hver.read_offset(buf, off2 + 16); + let target = hver.read_offset(buf, off2 + 16); //trace!("NODE {} {} {} {} {}", ts1a, ts1b, ts2a, ts2b, child_or_id); if target != 0 && ts2 != 0 { let target = if is_leaf { @@ -586,7 +585,7 @@ impl Rtree { let file = open_read(self.path.clone(), stats).await?; let ret = Self { path: self.path.clone(), - file: file, + rb: RingBuf::new(file, 0, stats.clone()).await?, m: self.m, root: self.root.clone(), hver: self.hver.duplicate(), @@ -669,10 +668,9 @@ pub async fn read_rtree_entrypoint( _basics: &IndexFileBasics, stats: &StatsChannel, ) -> Result { - seek(file, SeekFrom::Start(pos), stats).await?; - let mut rb = RingBuf::new(); + let mut rb = RingBuf::new(file, pos, stats.clone()).await?; // TODO remove, this is anyway still using a hardcoded offset size. - rb.fill_min(file, 8 + 4, stats).await?; + rb.fill_min(8 + 4).await?; if rb.len() < 8 + 4 { return Err(Error::with_msg_no_trace("could not read enough")); } @@ -681,7 +679,8 @@ pub async fn read_rtree_entrypoint( let rtree_m = readu32(b, 8); //info!("node_offset: {} rtree_m: {}", node_offset, rtree_m); let pos = FilePos { pos: node_offset }; - let node = read_rtree_node(file, pos, rtree_m as usize, stats).await?; + let mut file = rb.into_file(); + let node = read_rtree_node(&mut file, pos, rtree_m as usize, stats).await?; //info!("read_rtree_entrypoint READ ROOT NODE: {:?}", node); Ok(node) } @@ -829,17 +828,17 @@ pub async fn read_channel( stats: &StatsChannel, ) -> Result, Error> { let path = path.into(); - let mut basics = read_file_basics(path.clone(), index_file, stats).await?; + let mut index_file = index_file; + let basics = read_file_basics(path.clone(), &mut index_file, stats).await?; let chn_hash = name_hash(channel_name, basics.name_hash_anchor_len as u32); let epos = &basics.name_hash_entries[chn_hash as usize]; let mut entries = vec![]; - let mut rb = RingBuf::new(); let mut pos = epos.named_hash_channel_entry_pos; + let mut rb = RingBuf::new(index_file, pos, stats.clone()).await?; loop { - rb.reset(); - seek(&mut basics.file, SeekFrom::Start(pos), stats).await?; + rb.seek(pos).await?; let fill_min = if basics.hver.offset_size() == 8 { 20 } else { 12 }; - rb.fill_min(&mut basics.file, fill_min, stats).await?; + rb.fill_min(fill_min).await?; if rb.len() < fill_min { warn!("not enough data to continue reading channel list from name hash list"); break; @@ -930,10 +929,9 @@ pub async fn read_datablockref( hver: &dyn HeaderVersion, stats: &StatsChannel, ) -> Result { - seek(file, SeekFrom::Start(pos.pos), stats).await?; - let mut rb = RingBuf::new(); + let mut rb = RingBuf::new(file, pos.pos, stats.clone()).await?; let min0 = hver.offset_size() * 2 + 2; - rb.fill_min(file, min0, stats).await?; + rb.fill_min(min0).await?; let buf = rb.data(); let mut p = 0; let next = hver.read_offset(buf, p); @@ -943,7 +941,37 @@ pub async fn read_datablockref( let len = readu16(buf, p) as usize; p += 2; let _ = p; - rb.fill_min(file, min0 + len, stats).await?; + rb.fill_min(min0 + len).await?; + let buf = rb.data(); + let fname = String::from_utf8(buf[min0..min0 + len].to_vec())?; + let next = DatarefPos(next); + let data_header_pos = DataheaderPos(data); + let ret = Dataref { + next, + data_header_pos, + fname, + }; + Ok(ret) +} + +pub async fn read_datablockref2( + rb: &mut BackReadBuf, + pos: DatarefPos, + hver: &dyn HeaderVersion, +) -> Result { + rb.seek(pos.0).await?; + let min0 = hver.offset_size() * 2 + 2; + rb.fill_min(min0).await?; + let buf = rb.data(); + let mut p = 0; + let next = hver.read_offset(buf, p); + p += hver.offset_size(); + let data = hver.read_offset(buf, p); + p += hver.offset_size(); + let len = readu16(buf, p) as usize; + p += 2; + let _ = p; + rb.fill_min(min0 + len).await?; let buf = rb.data(); let fname = String::from_utf8(buf[min0..min0 + len].to_vec())?; let next = DatarefPos(next); @@ -964,12 +992,11 @@ async fn channel_list_from_index_name_hash_list( ) -> Result, Error> { let mut pos = pos; let mut ret = vec![]; - let mut rb = RingBuf::new(); + let mut rb = RingBuf::new(file, pos.pos, stats.clone()).await?; loop { - rb.reset(); - seek(file, SeekFrom::Start(pos.pos), stats).await?; + rb.seek(pos.pos).await?; let fill_min = if hver.offset_size() == 8 { 20 } else { 12 }; - rb.fill_min(file, fill_min, stats).await?; + rb.fill_min(fill_min).await?; if rb.len() < fill_min { warn!("not enough data to continue reading channel list from name hash list"); break; @@ -989,8 +1016,8 @@ async fn channel_list_from_index_name_hash_list( // TODO retire this function pub async fn channel_list(index_path: PathBuf, stats: &StatsChannel) -> Result, Error> { let mut ret = vec![]; - let file = open_read(index_path.clone(), stats).await?; - let mut basics = read_file_basics(index_path.clone(), file, stats).await?; + let mut file = open_read(index_path.clone(), stats).await?; + let basics = read_file_basics(index_path.clone(), &mut file, stats).await?; let hver2 = HeaderVersion2; let hver3 = HeaderVersion3; let hver: &dyn HeaderVersion = if basics.version == 2 { @@ -1008,7 +1035,7 @@ pub async fn channel_list(index_path: PathBuf, stats: &StatsChannel) -> Result Result<(), Error> { let fut = async { let stats = &StatsChannel::dummy(); - let file = open_read(CHN_0_MASTER_INDEX.into(), stats).await?; - let res = read_file_basics(CHN_0_MASTER_INDEX.into(), file, stats).await?; + let mut file = open_read(CHN_0_MASTER_INDEX.into(), stats).await?; + let res = read_file_basics(CHN_0_MASTER_INDEX.into(), &mut file, stats).await?; assert_eq!(res.version, 3); assert_eq!(res.name_hash_anchor_beg, 88); assert_eq!(res.name_hash_anchor_len, 1009); @@ -1062,7 +1089,8 @@ mod test { let fut = async { let stats = &StatsChannel::dummy(); let channel_name = "X05DA-FE-WI1:TC1"; - let basics = IndexFileBasics::from_path(CHN_0_MASTER_INDEX, stats).await?; + let mut file = open_read(CHN_0_MASTER_INDEX.into(), stats).await?; + let basics = IndexFileBasics::from_file(CHN_0_MASTER_INDEX, &mut file, stats).await?; let tree = basics.rtree_for_channel(channel_name, stats).await?; let tree = tree.ok_or_else(|| Error::with_msg("no tree found for channel"))?; assert_eq!(tree.m, 50); @@ -1078,7 +1106,8 @@ mod test { let stats = &StatsChannel::dummy(); let channel_name = "X05DA-FE-WI1:TC1"; let range = NanoRange { beg: 0, end: u64::MAX }; - let basics = IndexFileBasics::from_path(CHN_0_MASTER_INDEX, stats).await?; + let mut file = open_read(CHN_0_MASTER_INDEX.into(), stats).await?; + let basics = IndexFileBasics::from_file(CHN_0_MASTER_INDEX, &mut file, stats).await?; let mut tree = basics .rtree_for_channel(channel_name, stats) .await? @@ -1114,7 +1143,8 @@ mod test { beg: 1601503499684884156, end: 1601569919634086480, }; - let basics = IndexFileBasics::from_path(CHN_0_MASTER_INDEX, stats).await?; + let mut file = open_read(CHN_0_MASTER_INDEX.into(), stats).await?; + let basics = IndexFileBasics::from_file(CHN_0_MASTER_INDEX, &mut file, stats).await?; let mut tree = basics .rtree_for_channel(channel_name, stats) .await? diff --git a/archapp/src/archeng/ringbuf.rs b/archapp/src/archeng/ringbuf.rs new file mode 100644 index 0000000..4cd9002 --- /dev/null +++ b/archapp/src/archeng/ringbuf.rs @@ -0,0 +1,135 @@ +use crate::archeng::{read, seek, StatsChannel}; +use err::Error; +use netpod::log::*; +use std::fmt; +use std::mem::ManuallyDrop; +use std::{borrow::BorrowMut, io::SeekFrom}; +use tokio::fs::File; + +pub struct RingBuf { + file: Option, + buf: Vec, + abs: usize, + wp: usize, + rp: usize, + stats: StatsChannel, + seek_request: u64, + seek_done: u64, + read_done: u64, +} + +impl RingBuf +where + F: BorrowMut, +{ + pub async fn new(file: F, pos: u64, stats: StatsChannel) -> Result { + let mut ret = Self { + file: Some(file), + buf: vec![0; 1024 * 1024], + abs: usize::MAX, + wp: 0, + rp: 0, + stats, + seek_request: 0, + seek_done: 0, + read_done: 0, + }; + ret.seek(pos).await?; + Ok(ret) + } + + pub fn into_file(mut self) -> F { + self.file.take().unwrap() + } + + pub fn len(&self) -> usize { + self.wp - self.rp + } + + pub fn adv(&mut self, n: usize) { + self.rp += n; + } + + pub fn data(&self) -> &[u8] { + &self.buf[self.rp..self.wp] + } + + async fn fill(&mut self) -> Result { + if self.rp == self.wp { + if self.rp != 0 { + self.wp = 0; + self.rp = 0; + } + } else { + unsafe { + std::ptr::copy::(&self.buf[self.rp], &mut self.buf[0], self.len()); + self.wp -= self.rp; + self.rp = 0; + } + } + let n = read( + self.file.as_mut().unwrap().borrow_mut(), + &mut self.buf[self.wp..], + &self.stats, + ) + .await?; + self.wp += n; + self.read_done += 1; + Ok(n) + } + + pub async fn fill_min(&mut self, min: usize) -> Result { + let len = self.len(); + while self.len() < min { + let n = self.fill().await?; + if n == 0 { + return Err(Error::with_msg_no_trace(format!("fill_min can not read min {}", min))); + } + } + Ok(self.len() - len) + } + + pub async fn seek(&mut self, pos: u64) -> Result { + let dp = pos as i64 - self.abs as i64 - self.rp as i64; + if dp < 0 && dp > -2048 { + debug!("small NEG seek {}", dp); + } else if dp == 0 { + debug!("zero seek"); + } else if dp > 0 && dp < 2048 { + debug!("small POS seek {}", dp); + } + self.abs = pos as usize; + self.rp = 0; + self.wp = 0; + let ret = seek( + self.file.as_mut().unwrap().borrow_mut(), + SeekFrom::Start(pos), + &self.stats, + ) + .await + .map_err(|e| Error::from(e))?; + self.seek_request += 1; + self.seek_done += 1; + Ok(ret) + } + + pub fn rp_abs(&self) -> u64 { + self.abs as u64 + self.rp as u64 + } +} + +impl fmt::Debug for RingBuf { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("RingBuf") + .field("abs", &self.abs) + .field("wp", &self.wp) + .field("rp", &self.rp) + .finish() + } +} + +impl Drop for RingBuf { + fn drop(&mut self) { + info!("RingBuf Drop {} {}", self.seek_request, self.read_done); + } +} diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index 0c4e7ea..9638254 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -96,6 +96,8 @@ pub fn tracing_init() { "archapp::archeng::datablockstream=info", "archapp::archeng::indextree=info", "archapp::archeng::blockstream=trace", + "archapp::archeng::ringbuf=trace", + "archapp::archeng::backreadbuf=trace", "archapp::storagemerge=info", "daqbuffer::test=trace", ]