diff --git a/.cargo/config.toml b/.cargo/config.toml index f0e7617..83a9246 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,9 +1,9 @@ [build] rustflags = [ - "-C", "force-frame-pointers=yes", - "-C", "force-unwind-tables=yes", - "-C", "embed-bitcode=no", - "-C", "relocation-model=pic", + #"-C", "force-frame-pointers=yes", + #"-C", "force-unwind-tables=yes", + #"-C", "embed-bitcode=no", + #"-C", "relocation-model=pic", #"-C", "inline-threshold=1000", #"-Z", "time-passes=yes", #"-Z", "time-llvm-passes=yes", diff --git a/Cargo.toml b/Cargo.toml index 0392088..402ae5b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,16 @@ members = ["daqbuffer", "h5out", "archapp", "archapp_xc", "archapp_wrap", "items", "nodenet", "httpclient", "fsio", "dq"] [profile.release] +opt-level = 3 +debug = 0 +overflow-checks = false +debug-assertions = false +lto = "thin" +codegen-units = 4 +incremental = false + +[profile.rel2] +inherits = "release" opt-level = 1 debug = 0 overflow-checks = false diff --git a/archapp/src/events.rs b/archapp/src/events.rs index add295f..1bb519c 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -365,24 +365,13 @@ pub async fn make_single_event_pipe( debug!("•••••••••••••••••••••••••• file matches requested range"); let f1 = File::open(de.path()).await?; info!("opened {:?}", de.path()); - - let z = position_file_for_evq(f1, evq.clone(), df.year).await?; - let mut f1 = if let PositionState::Positioned(_pos) = z.state { - z.file + let mut z = position_file_for_evq(f1, evq.clone(), df.year).await?; + let mut pbr = if let PositionState::Positioned(pos) = z.state { + z.pbr.reset_io(pos).await?; + z.pbr } else { continue; }; - - // TODO could avoid some seeks if position_file_for_evq would return the position instead of - // positioning the file. - let pos1 = f1.stream_position().await?; - f1.seek(SeekFrom::Start(0)).await?; - let mut pbr = PbFileReader::new(f1).await; - pbr.read_header().await?; - debug!("✓ read header {:?}", pbr.payload_type()); - pbr.file().seek(SeekFrom::Start(pos1)).await?; - pbr.reset_io(pos1); - let mut i1 = 0; 'evread: loop { match pbr.read_msg().await { @@ -443,12 +432,12 @@ pub enum PositionState { } pub struct PositionResult { - pub file: File, + pub pbr: PbFileReader, pub state: PositionState, } pub async fn position_file_for_evq(mut file: File, evq: RawEventsQuery, year: u32) -> Result { - info!("-------------- position_file_for_evq"); + trace!("-------------- position_file_for_evq"); let flen = file.seek(SeekFrom::End(0)).await?; file.seek(SeekFrom::Start(0)).await?; if true || flen < 1024 * 512 { @@ -459,20 +448,20 @@ pub async fn position_file_for_evq(mut file: File, evq: RawEventsQuery, year: u3 } async fn position_file_for_evq_linear(file: File, evq: RawEventsQuery, _year: u32) -> Result { - let mut pbr = PbFileReader::new(file).await; + // TODO make read of header part of init: + let mut pbr = PbFileReader::new(file).await?; let mut curpos; - pbr.read_header().await?; loop { // TODO // Issue is that I always read more than the actual packet. // Is protobuf length-framed? // Otherwise: read_header must return the number of bytes that were read. curpos = pbr.abspos(); - info!("position_file_for_evq_linear save curpos {}", curpos); + trace!("position_file_for_evq_linear save curpos {}", curpos); let res = pbr.read_msg().await?; match res { Some(res) => { - info!( + trace!( "position_file_for_evq_linear read_msg pos {} len {}", res.pos, res.item.len() @@ -482,22 +471,23 @@ async fn position_file_for_evq_linear(file: File, evq: RawEventsQuery, _year: u3 } let tslast = res.item.ts(res.item.len() - 1); let diff = tslast as i64 - evq.range.beg as i64; - info!("position_file_for_evq_linear tslast {} diff {}", tslast, diff); + trace!("position_file_for_evq_linear tslast {} diff {}", tslast, diff); if tslast >= evq.range.beg { - info!("FOUND curpos {}", curpos); - pbr.file().seek(SeekFrom::Start(curpos)).await?; + debug!("position_file_for_evq_linear Positioned curpos {}", curpos); + pbr.reset_io(curpos).await?; let ret = PositionResult { - file: pbr.into_file(), state: PositionState::Positioned(curpos), + pbr, }; return Ok(ret); } } None => { - info!("position_file_for_evq_linear NothingFound"); + debug!("position_file_for_evq_linear NothingFound"); + pbr.reset_io(0).await?; let ret = PositionResult { - file: pbr.into_file(), state: PositionState::NothingFound, + pbr, }; return Ok(ret); } @@ -506,14 +496,15 @@ async fn position_file_for_evq_linear(file: File, evq: RawEventsQuery, _year: u3 } async fn position_file_for_evq_binary(mut file: File, evq: RawEventsQuery, year: u32) -> Result { - info!("position_file_for_evq_binary"); + debug!("position_file_for_evq_binary"); let flen = file.seek(SeekFrom::End(0)).await?; file.seek(SeekFrom::Start(0)).await?; - let mut pbr = PbFileReader::new(file).await; - pbr.read_header().await?; + // TODO make read of header part of init: + let mut pbr = PbFileReader::new(file).await?; let payload_type = pbr.payload_type().clone(); let res = pbr.read_msg().await?; - let mut file = pbr.into_file(); + //let mut file = pbr.into_file(); + let file = pbr.file(); let res = if let Some(res) = res { res } else { @@ -546,21 +537,21 @@ async fn position_file_for_evq_binary(mut file: File, evq: RawEventsQuery, year: let evs1 = parse_all_ts(p1 - 1, &buf1, payload_type.clone(), year)?; let evs2 = parse_all_ts(p2, &buf2, payload_type.clone(), year)?; - info!("..............................................................."); - info!("evs1.len() {:?}", evs1.len()); - info!("evs2.len() {:?}", evs2.len()); - info!("p1: {}", p1); - info!("p2: {}", p2); + debug!("..............................................................."); + debug!("evs1.len() {:?}", evs1.len()); + debug!("evs2.len() {:?}", evs2.len()); + debug!("p1: {}", p1); + debug!("p2: {}", p2); let tgt = evq.range.beg; { let ev = evs1.first().unwrap(); if ev.ts >= tgt { - file.seek(SeekFrom::Start(ev.pos)).await?; + pbr.reset_io(ev.pos).await?; let ret = PositionResult { state: PositionState::Positioned(ev.pos), - file, + pbr, }; return Ok(ret); } @@ -568,10 +559,10 @@ async fn position_file_for_evq_binary(mut file: File, evq: RawEventsQuery, year: { let ev = evs2.last().unwrap(); if ev.ts < tgt { - file.seek(SeekFrom::Start(0)).await?; + pbr.reset_io(0).await?; let ret = PositionResult { state: PositionState::NothingFound, - file, + pbr, }; return Ok(ret); } @@ -585,7 +576,7 @@ async fn position_file_for_evq_binary(mut file: File, evq: RawEventsQuery, year: if p2 - p1 < 1024 * 128 { // TODO switch here to linear search... info!("switch to linear search in pos {}..{}", p1, p2); - return linear_search_2(file, evq, year, p1, p2, payload_type).await; + return linear_search_2(pbr, evq, year, p1, p2, payload_type).await; } let p3 = (p2 + p1) / 2; file.seek(SeekFrom::Start(p3)).await?; @@ -603,25 +594,27 @@ async fn position_file_for_evq_binary(mut file: File, evq: RawEventsQuery, year: } async fn linear_search_2( - mut file: File, + mut pbr: PbFileReader, evq: RawEventsQuery, year: u32, p1: u64, p2: u64, payload_type: PayloadType, ) -> Result { - eprintln!("linear_search_2"); + debug!("linear_search_2 begin"); + // TODO improve.. either use additional file handle, or keep pbr in consistent state. + let file = pbr.file(); file.seek(SeekFrom::Start(p1 - 1)).await?; let mut buf = vec![0; (p2 - p1) as usize]; file.read_exact(&mut buf).await?; let evs1 = parse_all_ts(p1 - 1, &buf, payload_type.clone(), year)?; for ev in evs1 { if ev.ts >= evq.range.beg { - info!("FOUND {:?}", ev); - file.seek(SeekFrom::Start(ev.pos)).await?; + debug!("linear_search_2 Positioned {:?}", ev); + pbr.reset_io(ev.pos).await?; let ret = PositionResult { - file, state: PositionState::Positioned(ev.pos), + pbr, }; return Ok(ret); } @@ -711,8 +704,7 @@ pub async fn channel_info(channel: &Channel, aa: &ArchiverAppliance) -> Result Self { - Self { + pub async fn new(file: File) -> Result { + let mut ret = Self { file, buf: vec![0; MIN_BUF_FILL * 4], escbuf: vec![], @@ -110,7 +111,9 @@ impl PbFileReader { channel_name: String::new(), payload_type: PayloadType::V4_GENERIC_BYTES, year: 0, - } + }; + ret.read_header().await?; + Ok(ret) } pub fn into_file(self) -> File { @@ -121,19 +124,18 @@ impl PbFileReader { &mut self.file } - pub fn reset_io(&mut self, off: u64) { - // TODO - // Should do the seek in here, or? - // Why do I need this anyway? + pub async fn reset_io(&mut self, off: u64) -> Result<(), Error> { + self.file().seek(SeekFrom::Start(off)).await?; self.wp = 0; self.rp = 0; self.off = off; + Ok(()) } pub async fn read_header(&mut self) -> Result<(), Error> { self.fill_buf().await?; let k = self.find_next_nl()?; - info!("read_header abspos {} packet len {}", self.abspos(), k + 1 - self.rp); + trace!("read_header abspos {} packet len {}", self.abspos(), k + 1 - self.rp); let buf = &mut self.buf; let m = unescape_archapp_msg(&buf[self.rp..k], mem::replace(&mut self.escbuf, vec![]))?; self.escbuf = m; @@ -151,7 +153,7 @@ impl PbFileReader { let k = if let Ok(k) = self.find_next_nl() { k } else { - warn!("Can not find a next NL"); + debug!("Can not find a next NL"); return Ok(None); }; //info!("read_msg abspos {} packet len {}", self.abspos(), k + 1 - self.rp); @@ -464,8 +466,7 @@ pub async fn scan_files_inner( //tx.send(Ok(Box::new(serde_json::to_value(fns)?) as ItemSerBox)).await?; let channel_path = &fns[proots.len() + 1..fns.len() - 11]; if !lru.query(channel_path) { - let mut pbr = PbFileReader::new(tokio::fs::File::open(&pe.path).await?).await; - pbr.read_header().await?; + let mut pbr = PbFileReader::new(tokio::fs::File::open(&pe.path).await?).await?; let normalized_channel_name = { let pvn = pbr.channel_name().replace("-", "/"); pvn.replace(":", "/") diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index fd4a36f..a4fe8f6 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -2,8 +2,7 @@ use crate::{FileChunkRead, NeedMinBuffer}; use bitshuffle::bitshuffle_decompress; use bytes::{Buf, BytesMut}; use err::Error; -use futures_core::Stream; -use futures_util::StreamExt; +use futures_util::{Stream, StreamExt}; use items::{ Appendable, ByteEstimate, Clearable, PushableIndex, RangeCompletableItem, SitemtyFrameType, StatsItem, StreamItem, WithLen, WithTimestamps, diff --git a/dq/Cargo.toml b/dq/Cargo.toml index c5ec97c..54b39e2 100644 --- a/dq/Cargo.toml +++ b/dq/Cargo.toml @@ -11,10 +11,14 @@ path = "src/dq.rs" #serde = { version = "1.0", features = ["derive"] } #serde_json = "1.0" tokio = { version = "1.14.77", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +futures-util = "0.3.14" clap = "3.0.0-beta.5" chrono = "0.4.19" +bytes = "1.0.1" err = { path = "../err" } taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } items = { path = "../items" } +parse = { path = "../parse" } +disk = { path = "../disk" } archapp = { path = "../archapp" } diff --git a/dq/src/bin/dq.rs b/dq/src/bin/dq.rs index 75af86f..cd89677 100644 --- a/dq/src/bin/dq.rs +++ b/dq/src/bin/dq.rs @@ -1,24 +1,23 @@ -// TODO crate `err` pulls in all other dependencies in order to implement From<...> for Error. -// Refactor that... -// Crate `taskrun` also depends on `err`... - use archapp::events::PositionState; use archapp::parse::PbFileReader; +use bytes::BufMut; use chrono::{TimeZone, Utc}; use clap::{crate_version, Parser}; +use disk::eventchunker::EventChunkerConf; use err::Error; +use items::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents}; use netpod::query::RawEventsQuery; -use netpod::timeunits::*; -use netpod::AggKind; -use netpod::Channel; -use netpod::NanoRange; +use netpod::{log::*, ByteOrder, ByteSize, ChannelConfig, HasScalarType, HasShape}; +use netpod::{timeunits::*, Shape}; +use netpod::{AggKind, Channel, NanoRange, Nanos, ScalarType}; +use parse::channelconfig::Config; use std::io::SeekFrom; +use std::mem::take; use std::path::PathBuf; -use tokio::fs::File; -use tokio::io::AsyncSeekExt; - -#[derive(Debug)] -pub struct Error2; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; #[derive(Debug, Parser)] #[clap(name="DAQ buffer tools", version=crate_version!())] @@ -31,51 +30,340 @@ pub struct Opts { #[derive(Debug, Parser)] pub enum SubCmd { + #[clap(about = "Convert a channel from the Archiver Appliance into Databuffer format.")] ConvertArchiverApplianceChannel(ConvertArchiverApplianceChannel), + ReadDatabufferConfigfile(ReadDatabufferConfigfile), + ReadDatabufferDatafile(ReadDatabufferDatafile), } #[derive(Debug, Parser)] pub struct ConvertArchiverApplianceChannel { - name: String, - #[clap(about = "Look for archiver appliance data at given path")] + #[clap( + long, + about = "Prefix for keyspaces, e.g. specify `daq` to get scalar keyspace directory `daq_2`" + )] + keyspace_prefix: String, + #[clap(long, about = "Name of the channel to convert")] + channel_name: String, + #[clap(long, about = "Look for archiver appliance data at given path")] input_dir: PathBuf, - #[clap(about = "Generate Databuffer format at given path")] + #[clap(long, about = "Generate Databuffer format at given path")] output_dir: PathBuf, } +#[derive(Debug, Parser)] +pub struct ReadDatabufferConfigfile { + #[clap(long)] + configfile: PathBuf, +} + +#[derive(Debug, Parser)] +pub struct ReadDatabufferDatafile { + #[clap(long)] + configfile: PathBuf, + #[clap(long)] + datafile: PathBuf, +} + +trait WritableValue { + fn put_value(&self, buf: &mut Vec); +} + +impl WritableValue for f32 { + fn put_value(&self, buf: &mut Vec) { + buf.put_f32(*self); + } +} + +impl WritableValue for f64 { + fn put_value(&self, buf: &mut Vec) { + buf.put_f64(*self); + } +} + +struct DataWriter { + output_dir: PathBuf, + kspre: String, + channel: Channel, + bs: Nanos, + tb: u64, + datafile: Option, + indexfile: Option, + wpos: u64, + buf1: Vec, +} + +impl DataWriter { + async fn new(output_dir: PathBuf, kspre: String, channel: Channel, bs: Nanos) -> Result { + let ret = Self { + output_dir, + kspre, + channel, + bs, + tb: u64::MAX, + datafile: None, + indexfile: None, + wpos: 0, + buf1: vec![0; 1024 * 1024], + }; + Ok(ret) + } + + async fn write_item(&mut self, item: &PlainEvents) -> Result<(), Error> { + match item { + PlainEvents::Scalar(item) => match item { + ScalarPlainEvents::Float(events) => { + self.write_events(2, ScalarType::F32, &events.tss, &events.values) + .await?; + } + ScalarPlainEvents::Double(events) => { + self.write_events(2, ScalarType::F64, &events.tss, &events.values) + .await?; + } + _ => todo!(), + }, + PlainEvents::Wave(item) => match item { + WavePlainEvents::Double(_events) => { + todo!() + } + _ => todo!(), + }, + } + Ok(()) + } + + async fn write_events( + &mut self, + ks: u32, + scalar_type: ScalarType, + tss: &Vec, + vals: &Vec, + ) -> Result<(), Error> { + let split = 0; + assert_eq!(tss.len(), vals.len()); + for i in 0..tss.len() { + let ts = tss[i]; + let tb = ts / self.bs.ns; + if tb != self.tb { + let tbdate = chrono::Utc.timestamp((tb * (self.bs.ns / SEC)) as i64, 0); + eprintln!("Create directory for timebin {}", tbdate); + let p1 = self.output_dir.join(format!("{}_{}", self.kspre, ks)); + let p2 = p1.join(self.channel.name()); + let p3 = p2.join(format!("{:019}", tb)); + let p4 = p3.join(format!("{:010}", split)); + let p5 = p4.join(format!("{:019}_00000_Data", self.bs.ns / MS)); + let p6 = p4.join(format!("{:019}_00000_Data_Index", self.bs.ns / MS)); + tokio::fs::create_dir_all(&p4).await.map_err(|e| { + error!("Can not create {:?}", p4); + e + })?; + let mut file = OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(&p5) + .await + .map_err(|e| { + error!("can not create new file {:?}", p5); + e + })?; + file.write_all(&0u16.to_be_bytes()).await?; + let chs = self.channel.name().as_bytes(); + let len1 = (chs.len() + 8) as u32; + file.write_all(&len1.to_be_bytes()).await?; + file.write_all(chs).await?; + file.write_all(&len1.to_be_bytes()).await?; + self.wpos = 10 + chs.len() as u64; + self.datafile = Some(file); + if ks == 3 { + let mut file = OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(&p6) + .await + .map_err(|e| { + error!("can not create new file {:?}", p6); + e + })?; + file.write_all(&0u16.to_be_bytes()).await?; + self.indexfile = Some(file); + } + self.tb = tb; + } + let file = self.datafile.as_mut().unwrap(); + let mut buf = take(&mut self.buf1); + buf.clear(); + buf.put_i32(0); + buf.put_u64(0); + buf.put_u64(ts); + buf.put_u64(0); + buf.put_u64(0); + // Status, Severity + buf.put_u8(0); + buf.put_u8(0); + buf.put_i32(-1); + let flags = 0; + buf.put_u8(flags); + buf.put_u8(scalar_type.index()); + vals[i].put_value(&mut buf); + buf.put_i32(0); + let len1 = buf.len(); + buf[0..4].as_mut().put_u32(len1 as u32); + buf[len1 - 4..len1].as_mut().put_u32(len1 as u32); + file.write_all(&buf).await?; + self.buf1 = buf; + if ks == 3 { + let file = self.indexfile.as_mut().unwrap(); + let mut buf = take(&mut self.buf1); + buf.clear(); + buf.put_u64(ts); + buf.put_u64(self.wpos); + file.write_all(&buf).await?; + self.buf1 = buf; + } + self.wpos += len1 as u64; + } + Ok(()) + } + + async fn write_config(&mut self, config: &Config) -> Result<(), Error> { + eprintln!("Create directory for channel config"); + let p1 = self.output_dir.join("config").join(self.channel.name()).join("latest"); + tokio::fs::create_dir_all(&p1).await.map_err(|e| { + error!("Can not create {:?}", p1); + e + })?; + let mut file = OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(p1.join("00000_Config")) + .await + .map_err(|e| { + error!("can not create config file in {:?}", p1); + e + })?; + let mut buf = take(&mut self.buf1); + { + buf.clear(); + buf.put_u16(0); + file.write_all(&buf).await?; + } + { + buf.clear(); + let chs = self.channel.name().as_bytes(); + let len1 = (chs.len() + 8) as u32; + buf.put_u32(len1); + buf.put_slice(chs); + buf.put_u32(len1); + //let len1 = buf.len(); + //buf[0..4].as_mut().put_u32(len1 as u32); + //buf[len1 - 4..len1].as_mut().put_u32(len1 as u32); + file.write_all(&buf).await?; + } + { + let e = &config.entries[0]; + buf.clear(); + buf.put_u32(0); + buf.put_u64(0); + buf.put_u64(0); + buf.put_i32(e.ks); + buf.put_u64(e.bs.ns / MS); + buf.put_i32(e.split_count); + buf.put_i32(e.status); + buf.put_i8(e.bb); + buf.put_i32(e.modulo); + buf.put_i32(e.offset); + buf.put_i16(e.precision); + let dtlen = 0; + buf.put_i32(dtlen); + let flags = 0; + buf.put_u8(flags); + buf.put_u8(e.scalar_type.index()); + if false { + // is shaped? + buf.put_u8(1); + buf.put_u32(16); + } + buf.put_i32(-1); + buf.put_i32(-1); + buf.put_i32(-1); + buf.put_i32(-1); + buf.put_i32(-1); + buf.put_u32(0); + let len1 = buf.len(); + buf[0..4].as_mut().put_u32(len1 as u32); + buf[len1 - 4..len1].as_mut().put_u32(len1 as u32); + file.write_all(&buf).await?; + } + self.buf1 = buf; + Ok(()) + } +} + +impl Drop for DataWriter { + fn drop(&mut self) { + let indexfile = self.indexfile.take(); + let datafile = self.datafile.take(); + tokio::task::spawn(async move { + match indexfile { + Some(mut file) => { + let _ = file.flush().await; + } + None => {} + } + match datafile { + Some(mut file) => { + let _ = file.flush().await; + } + None => {} + } + }); + } +} + pub fn main() -> Result<(), Error> { taskrun::run(async { if false { return Err(Error::with_msg_no_trace(format!("unknown command"))); } let opts = Opts::parse(); - eprintln!("Opts: {:?}", opts); match opts.subcmd { SubCmd::ConvertArchiverApplianceChannel(sub) => { - // + let _ = tokio::fs::create_dir(&sub.output_dir).await; + let meta = tokio::fs::metadata(&sub.output_dir).await?; + if !meta.is_dir() { + return Err(Error::from_string(format!( + "Given output path is not a directory: {:?}", + sub.output_dir + ))); + } + let bs = Nanos::from_ns(DAY); + let mut channel_config: Option = None; let channel = Channel { backend: String::new(), - name: sub.name.into(), + name: sub.channel_name.into(), }; + let mut data_writer = + DataWriter::new(sub.output_dir, sub.keyspace_prefix.into(), channel.clone(), bs.clone()).await?; let chandir = archapp::events::directory_for_channel_files(&channel, &sub.input_dir)?; - eprintln!("channel path: {:?}", chandir); + eprintln!("Looking for files in: {:?}", chandir); let files = archapp::events::find_files_for_channel(&sub.input_dir, &channel).await?; - eprintln!("files: {:?}", files); let mut evstot = 0; for file in files { - eprintln!("try to open {:?}", file); + eprintln!("Try to open {:?}", file); let fni = archapp::events::parse_data_filename(file.to_str().unwrap())?; - eprintln!("fni: {:?}", fni); + debug!("fni: {:?}", fni); let ts0 = Utc.ymd(fni.year as i32, fni.month, 1).and_hms(0, 0, 0); let ts1 = ts0.timestamp() as u64 * SEC + ts0.timestamp_subsec_nanos() as u64; let _ = ts1; let mut f1 = File::open(&file).await?; - let flen = f1.seek(SeekFrom::End(0)).await?; - eprintln!("flen: {}", flen); + let _flen = f1.seek(SeekFrom::End(0)).await?; f1.seek(SeekFrom::Start(0)).await?; - let mut pbr = PbFileReader::new(f1).await; - pbr.read_header().await?; - eprintln!("✓ read header payload_type {:?}", pbr.payload_type()); + let pbr = PbFileReader::new(f1).await?; + debug!("channel name in pbr file: {:?}", pbr.channel_name()); + debug!("data type in file: {:?}", pbr.payload_type()); let evq = RawEventsQuery { channel: channel.clone(), range: NanoRange { @@ -90,44 +378,230 @@ pub fn main() -> Result<(), Error> { // TODO can the positioning-logic maybe re-use the pbr? let z = archapp::events::position_file_for_evq(f1, evq.clone(), fni.year).await?; if let PositionState::Positioned(pos) = z.state { - let mut f1 = z.file; - f1.seek(SeekFrom::Start(0)).await?; - let mut pbr = PbFileReader::new(f1).await; - // TODO incorporate the read_header into the init. must not be forgotten. - pbr.read_header().await?; - pbr.file().seek(SeekFrom::Start(pos)).await?; - pbr.reset_io(pos); - eprintln!("POSITIONED 1 at {}", pbr.abspos()); + let mut pbr = z.pbr; + assert_eq!(pos, pbr.abspos()); let mut i1 = 0; + let mut repnext = u64::MAX; loop { match pbr.read_msg().await { Ok(Some(ei)) => { use items::{WithLen, WithTimestamps}; let ei = ei.item; - let tslast = if ei.len() > 0 { Some(ei.ts(ei.len() - 1)) } else { None }; - i1 += 1; - if i1 < 20 { - eprintln!("read msg from file {} len {} tslast {:?}", i1, ei.len(), tslast); + if ei.is_wave() { + eprintln!("ERROR wave channels are not yet fully supported"); + return Ok(()); + } + if ei.len() > 0 { + let scalar_type = ei.scalar_type(); + let shape = match &ei { + items::eventsitem::EventsItem::Plain(k) => match k.shape() { + Shape::Scalar => None, + Shape::Wave(n) => Some(vec![n]), + Shape::Image(..) => panic!(), + }, + items::eventsitem::EventsItem::XBinnedEvents(_) => panic!(), + }; + if let Some(conf) = &channel_config { + if scalar_type != conf.entries[0].scalar_type { + let msg = format!( + "unexpected type: {:?} vs {:?}", + scalar_type, conf.entries[0].scalar_type + ); + return Err(Error::with_msg_no_trace(msg)); + } + if shape != conf.entries[0].shape { + let msg = format!( + "unexpected shape: {:?} vs {:?}", + shape, conf.entries[0].shape + ); + return Err(Error::with_msg_no_trace(msg)); + } + } + if channel_config.is_none() { + let ks = if ei.is_wave() { 3 } else { 2 }; + let scalar_type_2 = match &ei { + items::eventsitem::EventsItem::Plain(k) => match k { + PlainEvents::Scalar(k) => match k { + ScalarPlainEvents::Byte(_) => ScalarType::I8, + ScalarPlainEvents::Short(_) => ScalarType::I16, + ScalarPlainEvents::Int(_) => ScalarType::I32, + ScalarPlainEvents::Float(_) => ScalarType::F32, + ScalarPlainEvents::Double(_) => ScalarType::F64, + }, + PlainEvents::Wave(k) => match k { + WavePlainEvents::Byte(_) => ScalarType::I8, + WavePlainEvents::Short(_) => ScalarType::I16, + WavePlainEvents::Int(_) => ScalarType::I32, + WavePlainEvents::Float(_) => ScalarType::F32, + WavePlainEvents::Double(_) => ScalarType::F64, + }, + }, + items::eventsitem::EventsItem::XBinnedEvents(_) => panic!(), + }; + if scalar_type_2 != scalar_type { + let msg = format!( + "unexpected type: {:?} vs {:?}", + scalar_type_2, scalar_type + ); + return Err(Error::with_msg_no_trace(msg)); + } + let e = parse::channelconfig::ConfigEntry { + ts: 0, + pulse: 0, + ks, + bs: bs.clone(), + split_count: 1, + status: 0, + bb: 0, + modulo: 0, + offset: 0, + precision: 0, + scalar_type: scalar_type, + is_compressed: false, + is_shaped: false, + is_array: false, + byte_order: netpod::ByteOrder::LE, + compression_method: None, + shape, + source_name: None, + unit: None, + description: None, + optional_fields: None, + value_converter: None, + }; + let k = parse::channelconfig::Config { + format_version: 0, + channel_name: channel.name().into(), + entries: vec![e], + }; + channel_config = Some(k); + } + match &ei { + items::eventsitem::EventsItem::Plain(item) => { + data_writer.write_item(item).await?; + } + items::eventsitem::EventsItem::XBinnedEvents(_) => { + panic!() + } + } + } + let tslast = if ei.len() > 0 { Some(ei.ts(ei.len() - 1)) } else { None }; + if i1 == repnext { + debug!("read msg from file {} len {} tslast {:?}", i1, ei.len(), tslast); + repnext = 1 + 4 * repnext / 3; + } + i1 += 1; + if false { + ei.x_aggregate(&evq.agg_kind); } - //let ei2 = ei.x_aggregate(&evq.agg_kind); } Ok(None) => { - eprintln!("reached end of file"); + debug!("reached end of file"); break; } Err(e) => { - eprintln!("error while reading msg {:?}", e); + error!("error while reading msg {:?}", e); break; } } } - eprintln!("read total {} events from file", i1); + debug!("read total {} events from the last file", i1); evstot += i1; } else { - eprintln!("Position fail."); + error!("Position fail."); + } + } + eprintln!("Total number of events converted: {}", evstot); + data_writer.write_config(channel_config.as_ref().unwrap()).await?; + Ok(()) + } + SubCmd::ReadDatabufferConfigfile(sub) => { + let mut file = File::open(&sub.configfile).await?; + let meta = file.metadata().await?; + let mut buf = vec![0; meta.len() as usize]; + file.read_exact(&mut buf).await?; + drop(file); + let config = match parse::channelconfig::parse_config(&buf) { + Ok(k) => k.1, + Err(e) => return Err(Error::with_msg_no_trace(format!("can not parse: {:?}", e))), + }; + eprintln!("Read config: {:?}", config); + eprintln!("Config bs: {}", config.entries[0].bs.ns / MS); + Ok(()) + } + SubCmd::ReadDatabufferDatafile(sub) => { + let mut file = File::open(&sub.configfile).await?; + let meta = file.metadata().await?; + let mut buf = vec![0; meta.len() as usize]; + file.read_exact(&mut buf).await?; + drop(file); + let config = match parse::channelconfig::parse_config(&buf) { + Ok(k) => k.1, + Err(e) => return Err(Error::with_msg_no_trace(format!("can not parse: {:?}", e))), + }; + let file = File::open(&sub.datafile).await?; + let inp = Box::pin(disk::file_content_stream( + file, + netpod::FileIoBufferSize::new(1024 * 16), + )); + let ce = &config.entries[0]; + let channel_config = ChannelConfig { + channel: Channel { + backend: String::new(), + name: config.channel_name.clone(), + }, + keyspace: ce.ks as u8, + time_bin_size: ce.bs, + scalar_type: ce.scalar_type.clone(), + compression: false, + shape: Shape::Scalar, + array: false, + byte_order: ByteOrder::LE, + }; + let range = NanoRange { + beg: u64::MIN, + end: u64::MAX, + }; + let stats_conf = EventChunkerConf { + disk_stats_every: ByteSize::mb(2), + }; + let max_ts = Arc::new(AtomicU64::new(0)); + let mut chunks = disk::eventchunker::EventChunker::from_start( + inp, + channel_config.clone(), + range, + stats_conf, + sub.datafile.clone(), + max_ts.clone(), + false, + true, + ); + + use futures_util::stream::StreamExt; + use items::WithLen; + + while let Some(item) = chunks.next().await { + let item = item?; + match item { + items::StreamItem::DataItem(item) => { + match item { + items::RangeCompletableItem::RangeComplete => { + warn!("RangeComplete"); + } + items::RangeCompletableItem::Data(item) => { + info!("Data len {}", item.len()); + info!("{:?}", item); + } + }; + } + items::StreamItem::Log(k) => { + eprintln!("Log item {:?}", k); + } + items::StreamItem::Stats(k) => { + eprintln!("Stats item {:?}", k); + } } } - eprintln!("evstot {}", evstot); Ok(()) } } diff --git a/err/src/lib.rs b/err/src/lib.rs index 41e53a4..2c238de 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -131,9 +131,13 @@ impl fmt::Debug for Error { } else if let Some(s) = &self.trace_str { s.into() } else { - "NOTRACE".into() + String::new() }; - write!(fmt, "Error {}\nTrace:\n{}", self.msg, trace_str) + write!(fmt, "{}", self.msg)?; + if !trace_str.is_empty() { + write!(fmt, "\nTrace:\n{}", trace_str)?; + } + Ok(()) } } diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 52410ac..5cfc0d5 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -41,7 +41,7 @@ pub struct BodyStream { pub inner: Box> + Send + Unpin>, } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] pub enum ScalarType { U8, U16, @@ -461,7 +461,7 @@ pub struct ChannelConfig { pub byte_order: ByteOrder, } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] pub enum Shape { Scalar, Wave(u32), diff --git a/parse/Cargo.toml b/parse/Cargo.toml index 300b425..d3da9fa 100644 --- a/parse/Cargo.toml +++ b/parse/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dependencies] serde = { version = "1.0", features = ["derive"] } -tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.14.77", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } chrono = { version = "0.4.19", features = ["serde"] } bytes = "1.0.1" byteorder = "1.4.3" diff --git a/parse/src/channelconfig.rs b/parse/src/channelconfig.rs index a9d7605..9903425 100644 --- a/parse/src/channelconfig.rs +++ b/parse/src/channelconfig.rs @@ -95,10 +95,10 @@ pub struct ConfigEntry { pub compression_method: Option, pub shape: Option>, pub source_name: Option, - unit: Option, - description: Option, - optional_fields: Option, - value_converter: Option, + pub unit: Option, + pub description: Option, + pub optional_fields: Option, + pub value_converter: Option, } impl ConfigEntry { @@ -255,8 +255,8 @@ pub fn parse_entry(inp: &[u8]) -> NRes> { )) } -/* -Parse the full configuration file. +/** +Parse a complete configuration file from given in-memory input buffer. */ pub fn parse_config(inp: &[u8]) -> NRes { let (inp, ver) = be_i16(inp)?; diff --git a/taskrun/src/taskrun.rs b/taskrun/src/taskrun.rs index 6c5316a..6efc742 100644 --- a/taskrun/src/taskrun.rs +++ b/taskrun/src/taskrun.rs @@ -68,7 +68,7 @@ pub fn run>>(f: F) -> Result match res { Ok(k) => Ok(k), Err(e) => { - error!("{:?}", e); + error!("Catched: {:?}", e); Err(e) } } @@ -107,6 +107,7 @@ pub fn tracing_init() { "disk::binned=info", "nodenet::conn=info", "daqbuffer::test=info", + "dq=info", ] .join(","), ))