From 2449a20775666753c862045b251013de37575c26 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 8 Dec 2021 21:54:03 +0100 Subject: [PATCH] Linear read --- archapp/src/err.rs | 48 ++++++++++++---- archapp/src/events.rs | 112 ++++++++++++++++++++++++------------ archapp/src/parse.rs | 42 ++++++++------ archapp/src/parse/multi.rs | 7 ++- archapp_wrap/src/lib.rs | 5 +- daqbuffer/src/cli.rs | 4 +- dq/Cargo.toml | 10 +++- dq/src/bin/dq.rs | 113 +++++++++++++++++++++++++++++++++---- 8 files changed, 258 insertions(+), 83 deletions(-) diff --git a/archapp/src/err.rs b/archapp/src/err.rs index 266c701..490934e 100644 --- a/archapp/src/err.rs +++ b/archapp/src/err.rs @@ -1,48 +1,74 @@ use std::fmt; -pub struct Error(err::Error); +pub struct ArchError(::err::Error); -impl Error { +impl ArchError { pub fn with_msg>(s: S) -> Self { - Self(err::Error::with_msg(s)) + Self(::err::Error::with_msg(s)) } pub fn with_msg_no_trace>(s: S) -> Self { - Self(err::Error::with_msg_no_trace(s)) + Self(::err::Error::with_msg_no_trace(s)) + } + + pub fn msg(&self) -> &str { + self.0.msg() + } + + pub fn reason(&self) -> Option<::err::Reason> { + self.0.reason() + } + + pub fn public_msg(&self) -> Option<&Vec> { + self.0.public_msg() } } -impl fmt::Debug for Error { +impl fmt::Debug for ArchError { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { self.0.fmt(fmt) } } -impl From for err::Error { - fn from(x: Error) -> Self { +impl fmt::Display for ArchError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(self, fmt) + } +} + +impl std::error::Error for ArchError {} + +impl From<::err::Error> for ArchError { + fn from(x: ::err::Error) -> Self { + Self(x) + } +} + +impl From for ::err::Error { + fn from(x: ArchError) -> Self { x.0 } } -impl From for Error { +impl From for ArchError { fn from(k: std::string::FromUtf8Error) -> Self { Self::with_msg(k.to_string()) } } -impl From for Error { +impl From for ArchError { fn from(k: std::io::Error) -> Self { Self::with_msg(k.to_string()) } } -impl From> for Error { +impl From> for ArchError { fn from(k: async_channel::SendError) -> Self { Self::with_msg(k.to_string()) } } -impl From for Error { +impl From for ArchError { fn from(k: serde_json::Error) -> Self { Self::with_msg(k.to_string()) } diff --git a/archapp/src/events.rs b/archapp/src/events.rs index 22f3c06..add295f 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -1,3 +1,4 @@ +use crate::err::ArchError; use crate::generated::EPICSEvent::PayloadType; use crate::parse::multi::parse_all_ts; use crate::parse::PbFileReader; @@ -26,9 +27,10 @@ use std::pin::Pin; use tokio::fs::{read_dir, File}; use tokio::io::{AsyncReadExt, AsyncSeekExt}; +#[derive(Debug)] pub struct DataFilename { - year: u32, - month: u32, + pub year: u32, + pub month: u32, } pub fn parse_data_filename(s: &str) -> Result { @@ -331,7 +333,7 @@ pub async fn make_single_event_pipe( // TODO must apply the proper x-binning depending on the requested AggKind. debug!("make_single_event_pipe {:?}", evq); let evq = evq.clone(); - let DirAndPrefix { dir, prefix } = directory_for_channel_files(&evq.channel, base_path)?; + let DirAndPrefix { dir, prefix } = directory_for_channel_files(&evq.channel, &base_path)?; //let dtbeg = Utc.timestamp((evq.range.beg / 1000000000) as i64, (evq.range.beg % 1000000000) as u32); let (tx, rx) = async_channel::bounded(16); let block1 = async move { @@ -365,7 +367,7 @@ pub async fn make_single_event_pipe( info!("opened {:?}", de.path()); let z = position_file_for_evq(f1, evq.clone(), df.year).await?; - let mut f1 = if let PositionState::Positioned = z.state { + let mut f1 = if let PositionState::Positioned(_pos) = z.state { z.file } else { continue; @@ -437,18 +439,19 @@ pub async fn make_single_event_pipe( pub enum PositionState { NothingFound, - Positioned, + Positioned(u64), } pub struct PositionResult { - file: File, - state: PositionState, + pub file: File, + pub state: PositionState, } -async fn position_file_for_evq(mut file: File, evq: RawEventsQuery, year: u32) -> Result { +pub async fn position_file_for_evq(mut file: File, evq: RawEventsQuery, year: u32) -> Result { + info!("-------------- position_file_for_evq"); let flen = file.seek(SeekFrom::End(0)).await?; file.seek(SeekFrom::Start(0)).await?; - if flen < 1024 * 512 { + if true || flen < 1024 * 512 { position_file_for_evq_linear(file, evq, year).await } else { position_file_for_evq_binary(file, evq, year).await @@ -457,27 +460,47 @@ async fn position_file_for_evq(mut file: File, evq: RawEventsQuery, year: u32) - async fn position_file_for_evq_linear(file: File, evq: RawEventsQuery, _year: u32) -> Result { let mut pbr = PbFileReader::new(file).await; + let mut curpos; pbr.read_header().await?; loop { + // TODO + // Issue is that I always read more than the actual packet. + // Is protobuf length-framed? + // Otherwise: read_header must return the number of bytes that were read. + curpos = pbr.abspos(); + info!("position_file_for_evq_linear save curpos {}", curpos); let res = pbr.read_msg().await?; - let res = if let Some(k) = res { - k - } else { - let ret = PositionResult { - file: pbr.into_file(), - state: PositionState::NothingFound, - }; - return Ok(ret); - }; - if res.item.len() < 1 { - return Err(Error::with_msg_no_trace("no event read from file")); - } - if res.item.ts(res.item.len() - 1) >= evq.range.beg { - let ret = PositionResult { - file: pbr.into_file(), - state: PositionState::Positioned, - }; - return Ok(ret); + match res { + Some(res) => { + info!( + "position_file_for_evq_linear read_msg pos {} len {}", + res.pos, + res.item.len() + ); + if res.item.len() < 1 { + return Err(Error::with_msg_no_trace("no event read from file")); + } + let tslast = res.item.ts(res.item.len() - 1); + let diff = tslast as i64 - evq.range.beg as i64; + info!("position_file_for_evq_linear tslast {} diff {}", tslast, diff); + if tslast >= evq.range.beg { + info!("FOUND curpos {}", curpos); + pbr.file().seek(SeekFrom::Start(curpos)).await?; + let ret = PositionResult { + file: pbr.into_file(), + state: PositionState::Positioned(curpos), + }; + return Ok(ret); + } + } + None => { + info!("position_file_for_evq_linear NothingFound"); + let ret = PositionResult { + file: pbr.into_file(), + state: PositionState::NothingFound, + }; + return Ok(ret); + } } } } @@ -524,8 +547,8 @@ async fn position_file_for_evq_binary(mut file: File, evq: RawEventsQuery, year: let evs2 = parse_all_ts(p2, &buf2, payload_type.clone(), year)?; info!("..............................................................."); - info!("evs1: {:?}", evs1); - info!("evs2: {:?}", evs2); + info!("evs1.len() {:?}", evs1.len()); + info!("evs2.len() {:?}", evs2.len()); info!("p1: {}", p1); info!("p2: {}", p2); @@ -536,7 +559,7 @@ async fn position_file_for_evq_binary(mut file: File, evq: RawEventsQuery, year: if ev.ts >= tgt { file.seek(SeekFrom::Start(ev.pos)).await?; let ret = PositionResult { - state: PositionState::Positioned, + state: PositionState::Positioned(ev.pos), file, }; return Ok(ret); @@ -598,7 +621,7 @@ async fn linear_search_2( file.seek(SeekFrom::Start(ev.pos)).await?; let ret = PositionResult { file, - state: PositionState::Positioned, + state: PositionState::Positioned(ev.pos), }; return Ok(ret); } @@ -639,30 +662,45 @@ fn events_item_to_framable(ei: EventsItem) -> Result, E } } -struct DirAndPrefix { +#[derive(Debug)] +pub struct DirAndPrefix { dir: PathBuf, prefix: String, } -fn directory_for_channel_files(channel: &Channel, base_path: PathBuf) -> Result { +pub fn directory_for_channel_files(channel: &Channel, base_path: &PathBuf) -> Result { // SARUN11/CVME/DBLM546/IOC_CPU_LOAD // SARUN11-CVME-DBLM546:IOC_CPU_LOAD let a: Vec<_> = channel.name.split("-").map(|s| s.split(":")).flatten().collect(); let path = base_path; - let path = a.iter().take(a.len() - 1).fold(path, |a, &x| a.join(x)); + let path = a.iter().take(a.len() - 1).fold(path.clone(), |a, &x| a.join(x)); let ret = DirAndPrefix { dir: path, prefix: a .last() - .ok_or_else(|| Error::with_msg_no_trace("no prefix in file"))? + .ok_or_else(|| ArchError::with_msg_no_trace("no prefix in file"))? .to_string(), }; Ok(ret) } +// The same channel-name in different data directories like "lts", "mts", .. are considered different channels. +pub async fn find_files_for_channel(base_path: &PathBuf, channel: &Channel) -> Result, ArchError> { + let mut ret = vec![]; + let chandir = directory_for_channel_files(channel, base_path)?; + let mut rd = read_dir(&chandir.dir).await?; + while let Some(en) = rd.next_entry().await? { + let fns = en.file_name().to_string_lossy().into_owned(); + if fns.starts_with(&format!("{}:20", chandir.prefix)) && fns.ends_with(".pb") { + ret.push(en.path()); + } + } + ret.sort_unstable(); + Ok(ret) +} + pub async fn channel_info(channel: &Channel, aa: &ArchiverAppliance) -> Result { - let DirAndPrefix { dir, prefix } = - directory_for_channel_files(channel, aa.data_base_paths.last().unwrap().clone())?; + let DirAndPrefix { dir, prefix } = directory_for_channel_files(channel, aa.data_base_paths.last().unwrap())?; let mut msgs = vec![]; msgs.push(format!("path: {}", dir.to_string_lossy())); let mut scalar_type = None; diff --git a/archapp/src/parse.rs b/archapp/src/parse.rs index a6f5269..b28e5d7 100644 --- a/archapp/src/parse.rs +++ b/archapp/src/parse.rs @@ -12,7 +12,7 @@ use items::eventvalues::EventValues; use items::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents}; use items::waveevents::WaveEvents; use netpod::log::*; -use netpod::{ArchiverAppliance, ChannelConfigQuery, ChannelConfigResponse, NodeConfigCached}; +use netpod::{ArchiverAppliance, ChannelConfigQuery, ChannelConfigResponse}; use protobuf::Message; use serde::Serialize; use serde_json::Value as JsonValue; @@ -55,7 +55,7 @@ fn parse_scalar_byte(m: &[u8], year: u32) -> Result { macro_rules! scalar_parse { ($m:expr, $year:expr, $pbt:ident, $eit:ident, $evty:ident) => {{ let msg = crate::generated::EPICSEvent::$pbt::parse_from_bytes($m) - .map_err(|_| Error::with_msg(format!("can not parse pb-type {}", stringify!($pbt))))?; + .map_err(|e| Error::with_msg(format!("can not parse pb-type {} {:?}", stringify!($pbt), e)))?; let mut t = EventValues::<$evty> { tss: vec![], values: vec![], @@ -103,6 +103,9 @@ impl PbFileReader { escbuf: vec![], wp: 0, rp: 0, + // TODO check usage of `off`. + // It should represent the absolute position where the 1st byte of `buf` is located + // in the file, independent of `wp` or `rp`. off: 0, channel_name: String::new(), payload_type: PayloadType::V4_GENERIC_BYTES, @@ -119,6 +122,9 @@ impl PbFileReader { } pub fn reset_io(&mut self, off: u64) { + // TODO + // Should do the seek in here, or? + // Why do I need this anyway? self.wp = 0; self.rp = 0; self.off = off; @@ -127,6 +133,7 @@ impl PbFileReader { pub async fn read_header(&mut self) -> Result<(), Error> { self.fill_buf().await?; let k = self.find_next_nl()?; + info!("read_header abspos {} packet len {}", self.abspos(), k + 1 - self.rp); let buf = &mut self.buf; let m = unescape_archapp_msg(&buf[self.rp..k], mem::replace(&mut self.escbuf, vec![]))?; self.escbuf = m; @@ -135,7 +142,6 @@ impl PbFileReader { self.channel_name = payload_info.get_pvname().into(); self.payload_type = payload_info.get_field_type(); self.year = payload_info.get_year() as u32; - self.off += k as u64 + 1 - self.rp as u64; self.rp = k + 1; Ok(()) } @@ -145,18 +151,19 @@ impl PbFileReader { let k = if let Ok(k) = self.find_next_nl() { k } else { + warn!("Can not find a next NL"); return Ok(None); }; + //info!("read_msg abspos {} packet len {}", self.abspos(), k + 1 - self.rp); let buf = &mut self.buf; let m = mem::replace(&mut self.escbuf, vec![]); let m = unescape_archapp_msg(&buf[self.rp..k], m)?; self.escbuf = m; let ei = Self::parse_buffer(&self.escbuf, self.payload_type.clone(), self.year)?; let ret = ReadMessageResult { - pos: self.off, + pos: self.off + self.rp as u64, item: ei, }; - self.off += k as u64 + 1 - self.rp as u64; self.rp = k + 1; Ok(Some(ret)) } @@ -205,6 +212,10 @@ impl PbFileReader { Ok(ei) } + pub fn abspos(&self) -> u64 { + self.off + self.rp as u64 + } + async fn fill_buf(&mut self) -> Result<(), Error> { if self.wp - self.rp >= MIN_BUF_FILL { return Ok(()); @@ -212,6 +223,7 @@ impl PbFileReader { if self.rp + MIN_BUF_FILL >= self.buf.len() { let n = self.wp - self.rp; self.buf.copy_within(self.rp..self.rp + n, 0); + self.off += self.rp as u64; self.rp = 0; self.wp = n; } @@ -400,7 +412,7 @@ impl LruCache { pub async fn scan_files_inner( pairs: BTreeMap, - node_config: NodeConfigCached, + data_base_paths: Vec, ) -> Result>, Error> { let _ = pairs; let (tx, rx) = bounded(16); @@ -408,18 +420,14 @@ pub async fn scan_files_inner( let tx2 = tx.clone(); let block1 = async move { let mut lru = LruCache::new(); - let aa = if let Some(aa) = &node_config.node.archiver_appliance { - aa.clone() - } else { - return Err(Error::with_msg("no archiver appliance config")); - }; - let dbc = dbconn::create_connection(&node_config.node_config.cluster.database).await?; - let ndi = dbconn::scan::get_node_disk_ident(&node_config, &dbc).await?; + // TODO insert channels as a consumer of this stream: + //let dbc = dbconn::create_connection(&node_config.node_config.cluster.database).await?; + //let ndi = dbconn::scan::get_node_disk_ident(&node_config, &dbc).await?; struct PE { path: PathBuf, fty: FileType, } - let proot = aa.data_base_paths.last().unwrap().clone(); + let proot = data_base_paths.last().unwrap().clone(); let proots = proot.to_str().unwrap().to_string(); let meta = tokio::fs::metadata(&proot).await?; let mut paths = VecDeque::new(); @@ -475,9 +483,9 @@ pub async fn scan_files_inner( .await .errstr()?; } else { - if false { - dbconn::insert_channel(channel_path.into(), ndi.facility, &dbc).await?; - } + // TODO as a consumer of this stream: + //dbconn::insert_channel(channel_path.into(), ndi.facility, &dbc).await?; + if let Ok(Some(msg)) = pbr.read_msg().await { let msg = msg.item; lru.insert(channel_path); diff --git a/archapp/src/parse/multi.rs b/archapp/src/parse/multi.rs index a18ebe4..85e3dbc 100644 --- a/archapp/src/parse/multi.rs +++ b/archapp/src/parse/multi.rs @@ -2,6 +2,7 @@ use crate::generated::EPICSEvent::PayloadType; use crate::parse::PbFileReader; use err::Error; use items::{WithLen, WithTimestamps}; +use netpod::log::*; #[derive(Debug)] pub struct PosTs { @@ -22,6 +23,7 @@ pub fn parse_all_ts(off: u64, buf: &[u8], payload_type: PayloadType, year: u32) i2 = i1; } else { // Have a chunk from i2..i1 + info!("call parse_buffer i2 {} i1 {}", i2, i1); match PbFileReader::parse_buffer(&buf[i2 + 1..i1], payload_type.clone(), year) { Ok(k) => { if k.len() != 1 { @@ -37,8 +39,9 @@ pub fn parse_all_ts(off: u64, buf: &[u8], payload_type: PayloadType, year: u32) ret.push(h); } } - Err(_e) => { - // TODO ignore except if it's the last chunk. + Err(e) => { + error!("parse_all_ts: {:?}", e); + return Err(e); } } i2 = i1; diff --git a/archapp_wrap/src/lib.rs b/archapp_wrap/src/lib.rs index 7d8822e..036754d 100644 --- a/archapp_wrap/src/lib.rs +++ b/archapp_wrap/src/lib.rs @@ -15,7 +15,10 @@ pub fn scan_files( pairs: BTreeMap, node_config: NodeConfigCached, ) -> Pin>, Error>> + Send>> { - Box::pin(archapp::parse::scan_files_inner(pairs, node_config)) + Box::pin(archapp::parse::scan_files_inner( + pairs, + node_config.node.archiver_appliance.unwrap().data_base_paths, + )) } pub async fn make_event_pipe( diff --git a/daqbuffer/src/cli.rs b/daqbuffer/src/cli.rs index da1c77e..98ad7b1 100644 --- a/daqbuffer/src/cli.rs +++ b/daqbuffer/src/cli.rs @@ -1,7 +1,7 @@ -use clap::{crate_version, Parser}; +use clap::{crate_authors, crate_version, Parser}; #[derive(Debug, Parser)] -#[clap(name="daqbuffer", author="Dominik Werder ", version=crate_version!())] +#[clap(name="daqbuffer", author=crate_authors!(), version=crate_version!())] pub struct Opts { #[clap(short, long, parse(from_occurrences))] pub verbose: i32, diff --git a/dq/Cargo.toml b/dq/Cargo.toml index 4c1b202..c5ec97c 100644 --- a/dq/Cargo.toml +++ b/dq/Cargo.toml @@ -10,7 +10,11 @@ path = "src/dq.rs" [dependencies] #serde = { version = "1.0", features = ["derive"] } #serde_json = "1.0" -#tokio = { version = "1.14.77", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } -err = { path = "../err" } -#taskrun = { path = "../taskrun" } +tokio = { version = "1.14.77", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } clap = "3.0.0-beta.5" +chrono = "0.4.19" +err = { path = "../err" } +taskrun = { path = "../taskrun" } +netpod = { path = "../netpod" } +items = { path = "../items" } +archapp = { path = "../archapp" } diff --git a/dq/src/bin/dq.rs b/dq/src/bin/dq.rs index a010985..75af86f 100644 --- a/dq/src/bin/dq.rs +++ b/dq/src/bin/dq.rs @@ -2,20 +2,29 @@ // Refactor that... // Crate `taskrun` also depends on `err`... -use std::path::PathBuf; - +use archapp::events::PositionState; +use archapp::parse::PbFileReader; +use chrono::{TimeZone, Utc}; +use clap::{crate_version, Parser}; use err::Error; +use netpod::query::RawEventsQuery; +use netpod::timeunits::*; +use netpod::AggKind; +use netpod::Channel; +use netpod::NanoRange; +use std::io::SeekFrom; +use std::path::PathBuf; +use tokio::fs::File; +use tokio::io::AsyncSeekExt; #[derive(Debug)] pub struct Error2; -use clap::{crate_version, Parser}; - #[derive(Debug, Parser)] -#[clap(name="DAQ tools", author="Dominik Werder ", version=crate_version!())] +#[clap(name="DAQ buffer tools", version=crate_version!())] pub struct Opts { #[clap(short, long, parse(from_occurrences))] - pub verbose: i32, + pub verbose: u32, #[clap(subcommand)] pub subcmd: SubCmd, } @@ -35,8 +44,92 @@ pub struct ConvertArchiverApplianceChannel { } pub fn main() -> Result<(), Error> { - //taskrun::run(async { Ok(()) }) - let opts = Opts::parse(); - eprintln!("Opts: {:?}", opts); - Err(Error::with_msg_no_trace(format!("123"))) + taskrun::run(async { + if false { + return Err(Error::with_msg_no_trace(format!("unknown command"))); + } + let opts = Opts::parse(); + eprintln!("Opts: {:?}", opts); + match opts.subcmd { + SubCmd::ConvertArchiverApplianceChannel(sub) => { + // + let channel = Channel { + backend: String::new(), + name: sub.name.into(), + }; + let chandir = archapp::events::directory_for_channel_files(&channel, &sub.input_dir)?; + eprintln!("channel path: {:?}", chandir); + let files = archapp::events::find_files_for_channel(&sub.input_dir, &channel).await?; + eprintln!("files: {:?}", files); + let mut evstot = 0; + for file in files { + eprintln!("try to open {:?}", file); + let fni = archapp::events::parse_data_filename(file.to_str().unwrap())?; + eprintln!("fni: {:?}", fni); + let ts0 = Utc.ymd(fni.year as i32, fni.month, 1).and_hms(0, 0, 0); + let ts1 = ts0.timestamp() as u64 * SEC + ts0.timestamp_subsec_nanos() as u64; + let _ = ts1; + let mut f1 = File::open(&file).await?; + let flen = f1.seek(SeekFrom::End(0)).await?; + eprintln!("flen: {}", flen); + f1.seek(SeekFrom::Start(0)).await?; + let mut pbr = PbFileReader::new(f1).await; + pbr.read_header().await?; + eprintln!("✓ read header payload_type {:?}", pbr.payload_type()); + let evq = RawEventsQuery { + channel: channel.clone(), + range: NanoRange { + beg: u64::MIN, + end: u64::MAX, + }, + agg_kind: AggKind::Plain, + disk_io_buffer_size: 1024 * 4, + do_decompress: true, + }; + let f1 = pbr.into_file(); + // TODO can the positioning-logic maybe re-use the pbr? + let z = archapp::events::position_file_for_evq(f1, evq.clone(), fni.year).await?; + if let PositionState::Positioned(pos) = z.state { + let mut f1 = z.file; + f1.seek(SeekFrom::Start(0)).await?; + let mut pbr = PbFileReader::new(f1).await; + // TODO incorporate the read_header into the init. must not be forgotten. + pbr.read_header().await?; + pbr.file().seek(SeekFrom::Start(pos)).await?; + pbr.reset_io(pos); + eprintln!("POSITIONED 1 at {}", pbr.abspos()); + let mut i1 = 0; + loop { + match pbr.read_msg().await { + Ok(Some(ei)) => { + use items::{WithLen, WithTimestamps}; + let ei = ei.item; + let tslast = if ei.len() > 0 { Some(ei.ts(ei.len() - 1)) } else { None }; + i1 += 1; + if i1 < 20 { + eprintln!("read msg from file {} len {} tslast {:?}", i1, ei.len(), tslast); + } + //let ei2 = ei.x_aggregate(&evq.agg_kind); + } + Ok(None) => { + eprintln!("reached end of file"); + break; + } + Err(e) => { + eprintln!("error while reading msg {:?}", e); + break; + } + } + } + eprintln!("read total {} events from file", i1); + evstot += i1; + } else { + eprintln!("Position fail."); + } + } + eprintln!("evstot {}", evstot); + Ok(()) + } + } + }) }