Linear read

This commit is contained in:
Dominik Werder
2021-12-08 21:54:03 +01:00
parent 3c64eafd14
commit 2449a20775
8 changed files with 258 additions and 83 deletions

View File

@@ -1,48 +1,74 @@
use std::fmt;
pub struct Error(err::Error);
pub struct ArchError(::err::Error);
impl Error {
impl ArchError {
pub fn with_msg<S: Into<String>>(s: S) -> Self {
Self(err::Error::with_msg(s))
Self(::err::Error::with_msg(s))
}
pub fn with_msg_no_trace<S: Into<String>>(s: S) -> Self {
Self(err::Error::with_msg_no_trace(s))
Self(::err::Error::with_msg_no_trace(s))
}
pub fn msg(&self) -> &str {
self.0.msg()
}
pub fn reason(&self) -> Option<::err::Reason> {
self.0.reason()
}
pub fn public_msg(&self) -> Option<&Vec<String>> {
self.0.public_msg()
}
}
impl fmt::Debug for Error {
impl fmt::Debug for ArchError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
self.0.fmt(fmt)
}
}
impl From<Error> for err::Error {
fn from(x: Error) -> Self {
impl fmt::Display for ArchError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(self, fmt)
}
}
impl std::error::Error for ArchError {}
impl From<::err::Error> for ArchError {
fn from(x: ::err::Error) -> Self {
Self(x)
}
}
impl From<ArchError> for ::err::Error {
fn from(x: ArchError) -> Self {
x.0
}
}
impl From<std::string::FromUtf8Error> for Error {
impl From<std::string::FromUtf8Error> for ArchError {
fn from(k: std::string::FromUtf8Error) -> Self {
Self::with_msg(k.to_string())
}
}
impl From<std::io::Error> for Error {
impl From<std::io::Error> for ArchError {
fn from(k: std::io::Error) -> Self {
Self::with_msg(k.to_string())
}
}
impl<T> From<async_channel::SendError<T>> for Error {
impl<T> From<async_channel::SendError<T>> for ArchError {
fn from(k: async_channel::SendError<T>) -> Self {
Self::with_msg(k.to_string())
}
}
impl From<serde_json::Error> for Error {
impl From<serde_json::Error> for ArchError {
fn from(k: serde_json::Error) -> Self {
Self::with_msg(k.to_string())
}

View File

@@ -1,3 +1,4 @@
use crate::err::ArchError;
use crate::generated::EPICSEvent::PayloadType;
use crate::parse::multi::parse_all_ts;
use crate::parse::PbFileReader;
@@ -26,9 +27,10 @@ use std::pin::Pin;
use tokio::fs::{read_dir, File};
use tokio::io::{AsyncReadExt, AsyncSeekExt};
#[derive(Debug)]
pub struct DataFilename {
year: u32,
month: u32,
pub year: u32,
pub month: u32,
}
pub fn parse_data_filename(s: &str) -> Result<DataFilename, Error> {
@@ -331,7 +333,7 @@ pub async fn make_single_event_pipe(
// TODO must apply the proper x-binning depending on the requested AggKind.
debug!("make_single_event_pipe {:?}", evq);
let evq = evq.clone();
let DirAndPrefix { dir, prefix } = directory_for_channel_files(&evq.channel, base_path)?;
let DirAndPrefix { dir, prefix } = directory_for_channel_files(&evq.channel, &base_path)?;
//let dtbeg = Utc.timestamp((evq.range.beg / 1000000000) as i64, (evq.range.beg % 1000000000) as u32);
let (tx, rx) = async_channel::bounded(16);
let block1 = async move {
@@ -365,7 +367,7 @@ pub async fn make_single_event_pipe(
info!("opened {:?}", de.path());
let z = position_file_for_evq(f1, evq.clone(), df.year).await?;
let mut f1 = if let PositionState::Positioned = z.state {
let mut f1 = if let PositionState::Positioned(_pos) = z.state {
z.file
} else {
continue;
@@ -437,18 +439,19 @@ pub async fn make_single_event_pipe(
pub enum PositionState {
NothingFound,
Positioned,
Positioned(u64),
}
pub struct PositionResult {
file: File,
state: PositionState,
pub file: File,
pub state: PositionState,
}
async fn position_file_for_evq(mut file: File, evq: RawEventsQuery, year: u32) -> Result<PositionResult, Error> {
pub async fn position_file_for_evq(mut file: File, evq: RawEventsQuery, year: u32) -> Result<PositionResult, Error> {
info!("-------------- position_file_for_evq");
let flen = file.seek(SeekFrom::End(0)).await?;
file.seek(SeekFrom::Start(0)).await?;
if flen < 1024 * 512 {
if true || flen < 1024 * 512 {
position_file_for_evq_linear(file, evq, year).await
} else {
position_file_for_evq_binary(file, evq, year).await
@@ -457,27 +460,47 @@ async fn position_file_for_evq(mut file: File, evq: RawEventsQuery, year: u32) -
async fn position_file_for_evq_linear(file: File, evq: RawEventsQuery, _year: u32) -> Result<PositionResult, Error> {
let mut pbr = PbFileReader::new(file).await;
let mut curpos;
pbr.read_header().await?;
loop {
// TODO
// Issue is that I always read more than the actual packet.
// Is protobuf length-framed?
// Otherwise: read_header must return the number of bytes that were read.
curpos = pbr.abspos();
info!("position_file_for_evq_linear save curpos {}", curpos);
let res = pbr.read_msg().await?;
let res = if let Some(k) = res {
k
} else {
let ret = PositionResult {
file: pbr.into_file(),
state: PositionState::NothingFound,
};
return Ok(ret);
};
if res.item.len() < 1 {
return Err(Error::with_msg_no_trace("no event read from file"));
}
if res.item.ts(res.item.len() - 1) >= evq.range.beg {
let ret = PositionResult {
file: pbr.into_file(),
state: PositionState::Positioned,
};
return Ok(ret);
match res {
Some(res) => {
info!(
"position_file_for_evq_linear read_msg pos {} len {}",
res.pos,
res.item.len()
);
if res.item.len() < 1 {
return Err(Error::with_msg_no_trace("no event read from file"));
}
let tslast = res.item.ts(res.item.len() - 1);
let diff = tslast as i64 - evq.range.beg as i64;
info!("position_file_for_evq_linear tslast {} diff {}", tslast, diff);
if tslast >= evq.range.beg {
info!("FOUND curpos {}", curpos);
pbr.file().seek(SeekFrom::Start(curpos)).await?;
let ret = PositionResult {
file: pbr.into_file(),
state: PositionState::Positioned(curpos),
};
return Ok(ret);
}
}
None => {
info!("position_file_for_evq_linear NothingFound");
let ret = PositionResult {
file: pbr.into_file(),
state: PositionState::NothingFound,
};
return Ok(ret);
}
}
}
}
@@ -524,8 +547,8 @@ async fn position_file_for_evq_binary(mut file: File, evq: RawEventsQuery, year:
let evs2 = parse_all_ts(p2, &buf2, payload_type.clone(), year)?;
info!("...............................................................");
info!("evs1: {:?}", evs1);
info!("evs2: {:?}", evs2);
info!("evs1.len() {:?}", evs1.len());
info!("evs2.len() {:?}", evs2.len());
info!("p1: {}", p1);
info!("p2: {}", p2);
@@ -536,7 +559,7 @@ async fn position_file_for_evq_binary(mut file: File, evq: RawEventsQuery, year:
if ev.ts >= tgt {
file.seek(SeekFrom::Start(ev.pos)).await?;
let ret = PositionResult {
state: PositionState::Positioned,
state: PositionState::Positioned(ev.pos),
file,
};
return Ok(ret);
@@ -598,7 +621,7 @@ async fn linear_search_2(
file.seek(SeekFrom::Start(ev.pos)).await?;
let ret = PositionResult {
file,
state: PositionState::Positioned,
state: PositionState::Positioned(ev.pos),
};
return Ok(ret);
}
@@ -639,30 +662,45 @@ fn events_item_to_framable(ei: EventsItem) -> Result<Box<dyn Framable + Send>, E
}
}
struct DirAndPrefix {
#[derive(Debug)]
pub struct DirAndPrefix {
dir: PathBuf,
prefix: String,
}
fn directory_for_channel_files(channel: &Channel, base_path: PathBuf) -> Result<DirAndPrefix, Error> {
pub fn directory_for_channel_files(channel: &Channel, base_path: &PathBuf) -> Result<DirAndPrefix, ArchError> {
// SARUN11/CVME/DBLM546/IOC_CPU_LOAD
// SARUN11-CVME-DBLM546:IOC_CPU_LOAD
let a: Vec<_> = channel.name.split("-").map(|s| s.split(":")).flatten().collect();
let path = base_path;
let path = a.iter().take(a.len() - 1).fold(path, |a, &x| a.join(x));
let path = a.iter().take(a.len() - 1).fold(path.clone(), |a, &x| a.join(x));
let ret = DirAndPrefix {
dir: path,
prefix: a
.last()
.ok_or_else(|| Error::with_msg_no_trace("no prefix in file"))?
.ok_or_else(|| ArchError::with_msg_no_trace("no prefix in file"))?
.to_string(),
};
Ok(ret)
}
// The same channel-name in different data directories like "lts", "mts", .. are considered different channels.
pub async fn find_files_for_channel(base_path: &PathBuf, channel: &Channel) -> Result<Vec<PathBuf>, ArchError> {
let mut ret = vec![];
let chandir = directory_for_channel_files(channel, base_path)?;
let mut rd = read_dir(&chandir.dir).await?;
while let Some(en) = rd.next_entry().await? {
let fns = en.file_name().to_string_lossy().into_owned();
if fns.starts_with(&format!("{}:20", chandir.prefix)) && fns.ends_with(".pb") {
ret.push(en.path());
}
}
ret.sort_unstable();
Ok(ret)
}
pub async fn channel_info(channel: &Channel, aa: &ArchiverAppliance) -> Result<ChannelInfo, Error> {
let DirAndPrefix { dir, prefix } =
directory_for_channel_files(channel, aa.data_base_paths.last().unwrap().clone())?;
let DirAndPrefix { dir, prefix } = directory_for_channel_files(channel, aa.data_base_paths.last().unwrap())?;
let mut msgs = vec![];
msgs.push(format!("path: {}", dir.to_string_lossy()));
let mut scalar_type = None;

View File

@@ -12,7 +12,7 @@ use items::eventvalues::EventValues;
use items::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents};
use items::waveevents::WaveEvents;
use netpod::log::*;
use netpod::{ArchiverAppliance, ChannelConfigQuery, ChannelConfigResponse, NodeConfigCached};
use netpod::{ArchiverAppliance, ChannelConfigQuery, ChannelConfigResponse};
use protobuf::Message;
use serde::Serialize;
use serde_json::Value as JsonValue;
@@ -55,7 +55,7 @@ fn parse_scalar_byte(m: &[u8], year: u32) -> Result<EventsItem, Error> {
macro_rules! scalar_parse {
($m:expr, $year:expr, $pbt:ident, $eit:ident, $evty:ident) => {{
let msg = crate::generated::EPICSEvent::$pbt::parse_from_bytes($m)
.map_err(|_| Error::with_msg(format!("can not parse pb-type {}", stringify!($pbt))))?;
.map_err(|e| Error::with_msg(format!("can not parse pb-type {} {:?}", stringify!($pbt), e)))?;
let mut t = EventValues::<$evty> {
tss: vec![],
values: vec![],
@@ -103,6 +103,9 @@ impl PbFileReader {
escbuf: vec![],
wp: 0,
rp: 0,
// TODO check usage of `off`.
// It should represent the absolute position where the 1st byte of `buf` is located
// in the file, independent of `wp` or `rp`.
off: 0,
channel_name: String::new(),
payload_type: PayloadType::V4_GENERIC_BYTES,
@@ -119,6 +122,9 @@ impl PbFileReader {
}
pub fn reset_io(&mut self, off: u64) {
// TODO
// Should do the seek in here, or?
// Why do I need this anyway?
self.wp = 0;
self.rp = 0;
self.off = off;
@@ -127,6 +133,7 @@ impl PbFileReader {
pub async fn read_header(&mut self) -> Result<(), Error> {
self.fill_buf().await?;
let k = self.find_next_nl()?;
info!("read_header abspos {} packet len {}", self.abspos(), k + 1 - self.rp);
let buf = &mut self.buf;
let m = unescape_archapp_msg(&buf[self.rp..k], mem::replace(&mut self.escbuf, vec![]))?;
self.escbuf = m;
@@ -135,7 +142,6 @@ impl PbFileReader {
self.channel_name = payload_info.get_pvname().into();
self.payload_type = payload_info.get_field_type();
self.year = payload_info.get_year() as u32;
self.off += k as u64 + 1 - self.rp as u64;
self.rp = k + 1;
Ok(())
}
@@ -145,18 +151,19 @@ impl PbFileReader {
let k = if let Ok(k) = self.find_next_nl() {
k
} else {
warn!("Can not find a next NL");
return Ok(None);
};
//info!("read_msg abspos {} packet len {}", self.abspos(), k + 1 - self.rp);
let buf = &mut self.buf;
let m = mem::replace(&mut self.escbuf, vec![]);
let m = unescape_archapp_msg(&buf[self.rp..k], m)?;
self.escbuf = m;
let ei = Self::parse_buffer(&self.escbuf, self.payload_type.clone(), self.year)?;
let ret = ReadMessageResult {
pos: self.off,
pos: self.off + self.rp as u64,
item: ei,
};
self.off += k as u64 + 1 - self.rp as u64;
self.rp = k + 1;
Ok(Some(ret))
}
@@ -205,6 +212,10 @@ impl PbFileReader {
Ok(ei)
}
pub fn abspos(&self) -> u64 {
self.off + self.rp as u64
}
async fn fill_buf(&mut self) -> Result<(), Error> {
if self.wp - self.rp >= MIN_BUF_FILL {
return Ok(());
@@ -212,6 +223,7 @@ impl PbFileReader {
if self.rp + MIN_BUF_FILL >= self.buf.len() {
let n = self.wp - self.rp;
self.buf.copy_within(self.rp..self.rp + n, 0);
self.off += self.rp as u64;
self.rp = 0;
self.wp = n;
}
@@ -400,7 +412,7 @@ impl LruCache {
pub async fn scan_files_inner(
pairs: BTreeMap<String, String>,
node_config: NodeConfigCached,
data_base_paths: Vec<PathBuf>,
) -> Result<Receiver<Result<ItemSerBox, Error>>, Error> {
let _ = pairs;
let (tx, rx) = bounded(16);
@@ -408,18 +420,14 @@ pub async fn scan_files_inner(
let tx2 = tx.clone();
let block1 = async move {
let mut lru = LruCache::new();
let aa = if let Some(aa) = &node_config.node.archiver_appliance {
aa.clone()
} else {
return Err(Error::with_msg("no archiver appliance config"));
};
let dbc = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
let ndi = dbconn::scan::get_node_disk_ident(&node_config, &dbc).await?;
// TODO insert channels as a consumer of this stream:
//let dbc = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
//let ndi = dbconn::scan::get_node_disk_ident(&node_config, &dbc).await?;
struct PE {
path: PathBuf,
fty: FileType,
}
let proot = aa.data_base_paths.last().unwrap().clone();
let proot = data_base_paths.last().unwrap().clone();
let proots = proot.to_str().unwrap().to_string();
let meta = tokio::fs::metadata(&proot).await?;
let mut paths = VecDeque::new();
@@ -475,9 +483,9 @@ pub async fn scan_files_inner(
.await
.errstr()?;
} else {
if false {
dbconn::insert_channel(channel_path.into(), ndi.facility, &dbc).await?;
}
// TODO as a consumer of this stream:
//dbconn::insert_channel(channel_path.into(), ndi.facility, &dbc).await?;
if let Ok(Some(msg)) = pbr.read_msg().await {
let msg = msg.item;
lru.insert(channel_path);

View File

@@ -2,6 +2,7 @@ use crate::generated::EPICSEvent::PayloadType;
use crate::parse::PbFileReader;
use err::Error;
use items::{WithLen, WithTimestamps};
use netpod::log::*;
#[derive(Debug)]
pub struct PosTs {
@@ -22,6 +23,7 @@ pub fn parse_all_ts(off: u64, buf: &[u8], payload_type: PayloadType, year: u32)
i2 = i1;
} else {
// Have a chunk from i2..i1
info!("call parse_buffer i2 {} i1 {}", i2, i1);
match PbFileReader::parse_buffer(&buf[i2 + 1..i1], payload_type.clone(), year) {
Ok(k) => {
if k.len() != 1 {
@@ -37,8 +39,9 @@ pub fn parse_all_ts(off: u64, buf: &[u8], payload_type: PayloadType, year: u32)
ret.push(h);
}
}
Err(_e) => {
// TODO ignore except if it's the last chunk.
Err(e) => {
error!("parse_all_ts: {:?}", e);
return Err(e);
}
}
i2 = i1;

View File

@@ -15,7 +15,10 @@ pub fn scan_files(
pairs: BTreeMap<String, String>,
node_config: NodeConfigCached,
) -> Pin<Box<dyn Future<Output = Result<Receiver<Result<ItemSerBox, Error>>, Error>> + Send>> {
Box::pin(archapp::parse::scan_files_inner(pairs, node_config))
Box::pin(archapp::parse::scan_files_inner(
pairs,
node_config.node.archiver_appliance.unwrap().data_base_paths,
))
}
pub async fn make_event_pipe(

View File

@@ -1,7 +1,7 @@
use clap::{crate_version, Parser};
use clap::{crate_authors, crate_version, Parser};
#[derive(Debug, Parser)]
#[clap(name="daqbuffer", author="Dominik Werder <dominik.werder@gmail.com>", version=crate_version!())]
#[clap(name="daqbuffer", author=crate_authors!(), version=crate_version!())]
pub struct Opts {
#[clap(short, long, parse(from_occurrences))]
pub verbose: i32,

View File

@@ -10,7 +10,11 @@ path = "src/dq.rs"
[dependencies]
#serde = { version = "1.0", features = ["derive"] }
#serde_json = "1.0"
#tokio = { version = "1.14.77", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
err = { path = "../err" }
#taskrun = { path = "../taskrun" }
tokio = { version = "1.14.77", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
clap = "3.0.0-beta.5"
chrono = "0.4.19"
err = { path = "../err" }
taskrun = { path = "../taskrun" }
netpod = { path = "../netpod" }
items = { path = "../items" }
archapp = { path = "../archapp" }

View File

@@ -2,20 +2,29 @@
// Refactor that...
// Crate `taskrun` also depends on `err`...
use std::path::PathBuf;
use archapp::events::PositionState;
use archapp::parse::PbFileReader;
use chrono::{TimeZone, Utc};
use clap::{crate_version, Parser};
use err::Error;
use netpod::query::RawEventsQuery;
use netpod::timeunits::*;
use netpod::AggKind;
use netpod::Channel;
use netpod::NanoRange;
use std::io::SeekFrom;
use std::path::PathBuf;
use tokio::fs::File;
use tokio::io::AsyncSeekExt;
#[derive(Debug)]
pub struct Error2;
use clap::{crate_version, Parser};
#[derive(Debug, Parser)]
#[clap(name="DAQ tools", author="Dominik Werder <dominik.werder@psi.ch>", version=crate_version!())]
#[clap(name="DAQ buffer tools", version=crate_version!())]
pub struct Opts {
#[clap(short, long, parse(from_occurrences))]
pub verbose: i32,
pub verbose: u32,
#[clap(subcommand)]
pub subcmd: SubCmd,
}
@@ -35,8 +44,92 @@ pub struct ConvertArchiverApplianceChannel {
}
pub fn main() -> Result<(), Error> {
//taskrun::run(async { Ok(()) })
let opts = Opts::parse();
eprintln!("Opts: {:?}", opts);
Err(Error::with_msg_no_trace(format!("123")))
taskrun::run(async {
if false {
return Err(Error::with_msg_no_trace(format!("unknown command")));
}
let opts = Opts::parse();
eprintln!("Opts: {:?}", opts);
match opts.subcmd {
SubCmd::ConvertArchiverApplianceChannel(sub) => {
//
let channel = Channel {
backend: String::new(),
name: sub.name.into(),
};
let chandir = archapp::events::directory_for_channel_files(&channel, &sub.input_dir)?;
eprintln!("channel path: {:?}", chandir);
let files = archapp::events::find_files_for_channel(&sub.input_dir, &channel).await?;
eprintln!("files: {:?}", files);
let mut evstot = 0;
for file in files {
eprintln!("try to open {:?}", file);
let fni = archapp::events::parse_data_filename(file.to_str().unwrap())?;
eprintln!("fni: {:?}", fni);
let ts0 = Utc.ymd(fni.year as i32, fni.month, 1).and_hms(0, 0, 0);
let ts1 = ts0.timestamp() as u64 * SEC + ts0.timestamp_subsec_nanos() as u64;
let _ = ts1;
let mut f1 = File::open(&file).await?;
let flen = f1.seek(SeekFrom::End(0)).await?;
eprintln!("flen: {}", flen);
f1.seek(SeekFrom::Start(0)).await?;
let mut pbr = PbFileReader::new(f1).await;
pbr.read_header().await?;
eprintln!("✓ read header payload_type {:?}", pbr.payload_type());
let evq = RawEventsQuery {
channel: channel.clone(),
range: NanoRange {
beg: u64::MIN,
end: u64::MAX,
},
agg_kind: AggKind::Plain,
disk_io_buffer_size: 1024 * 4,
do_decompress: true,
};
let f1 = pbr.into_file();
// TODO can the positioning-logic maybe re-use the pbr?
let z = archapp::events::position_file_for_evq(f1, evq.clone(), fni.year).await?;
if let PositionState::Positioned(pos) = z.state {
let mut f1 = z.file;
f1.seek(SeekFrom::Start(0)).await?;
let mut pbr = PbFileReader::new(f1).await;
// TODO incorporate the read_header into the init. must not be forgotten.
pbr.read_header().await?;
pbr.file().seek(SeekFrom::Start(pos)).await?;
pbr.reset_io(pos);
eprintln!("POSITIONED 1 at {}", pbr.abspos());
let mut i1 = 0;
loop {
match pbr.read_msg().await {
Ok(Some(ei)) => {
use items::{WithLen, WithTimestamps};
let ei = ei.item;
let tslast = if ei.len() > 0 { Some(ei.ts(ei.len() - 1)) } else { None };
i1 += 1;
if i1 < 20 {
eprintln!("read msg from file {} len {} tslast {:?}", i1, ei.len(), tslast);
}
//let ei2 = ei.x_aggregate(&evq.agg_kind);
}
Ok(None) => {
eprintln!("reached end of file");
break;
}
Err(e) => {
eprintln!("error while reading msg {:?}", e);
break;
}
}
}
eprintln!("read total {} events from file", i1);
evstot += i1;
} else {
eprintln!("Position fail.");
}
}
eprintln!("evstot {}", evstot);
Ok(())
}
}
})
}