Write converted config

This commit is contained in:
Dominik Werder
2021-12-09 14:55:56 +01:00
parent 2449a20775
commit 11229bd514
12 changed files with 609 additions and 124 deletions

View File

@@ -1,9 +1,9 @@
[build]
rustflags = [
"-C", "force-frame-pointers=yes",
"-C", "force-unwind-tables=yes",
"-C", "embed-bitcode=no",
"-C", "relocation-model=pic",
#"-C", "force-frame-pointers=yes",
#"-C", "force-unwind-tables=yes",
#"-C", "embed-bitcode=no",
#"-C", "relocation-model=pic",
#"-C", "inline-threshold=1000",
#"-Z", "time-passes=yes",
#"-Z", "time-llvm-passes=yes",

View File

@@ -2,6 +2,16 @@
members = ["daqbuffer", "h5out", "archapp", "archapp_xc", "archapp_wrap", "items", "nodenet", "httpclient", "fsio", "dq"]
[profile.release]
opt-level = 3
debug = 0
overflow-checks = false
debug-assertions = false
lto = "thin"
codegen-units = 4
incremental = false
[profile.rel2]
inherits = "release"
opt-level = 1
debug = 0
overflow-checks = false

View File

@@ -365,24 +365,13 @@ pub async fn make_single_event_pipe(
debug!("•••••••••••••••••••••••••• file matches requested range");
let f1 = File::open(de.path()).await?;
info!("opened {:?}", de.path());
let z = position_file_for_evq(f1, evq.clone(), df.year).await?;
let mut f1 = if let PositionState::Positioned(_pos) = z.state {
z.file
let mut z = position_file_for_evq(f1, evq.clone(), df.year).await?;
let mut pbr = if let PositionState::Positioned(pos) = z.state {
z.pbr.reset_io(pos).await?;
z.pbr
} else {
continue;
};
// TODO could avoid some seeks if position_file_for_evq would return the position instead of
// positioning the file.
let pos1 = f1.stream_position().await?;
f1.seek(SeekFrom::Start(0)).await?;
let mut pbr = PbFileReader::new(f1).await;
pbr.read_header().await?;
debug!("✓ read header {:?}", pbr.payload_type());
pbr.file().seek(SeekFrom::Start(pos1)).await?;
pbr.reset_io(pos1);
let mut i1 = 0;
'evread: loop {
match pbr.read_msg().await {
@@ -443,12 +432,12 @@ pub enum PositionState {
}
pub struct PositionResult {
pub file: File,
pub pbr: PbFileReader,
pub state: PositionState,
}
pub async fn position_file_for_evq(mut file: File, evq: RawEventsQuery, year: u32) -> Result<PositionResult, Error> {
info!("-------------- position_file_for_evq");
trace!("-------------- position_file_for_evq");
let flen = file.seek(SeekFrom::End(0)).await?;
file.seek(SeekFrom::Start(0)).await?;
if true || flen < 1024 * 512 {
@@ -459,20 +448,20 @@ pub async fn position_file_for_evq(mut file: File, evq: RawEventsQuery, year: u3
}
async fn position_file_for_evq_linear(file: File, evq: RawEventsQuery, _year: u32) -> Result<PositionResult, Error> {
let mut pbr = PbFileReader::new(file).await;
// TODO make read of header part of init:
let mut pbr = PbFileReader::new(file).await?;
let mut curpos;
pbr.read_header().await?;
loop {
// TODO
// Issue is that I always read more than the actual packet.
// Is protobuf length-framed?
// Otherwise: read_header must return the number of bytes that were read.
curpos = pbr.abspos();
info!("position_file_for_evq_linear save curpos {}", curpos);
trace!("position_file_for_evq_linear save curpos {}", curpos);
let res = pbr.read_msg().await?;
match res {
Some(res) => {
info!(
trace!(
"position_file_for_evq_linear read_msg pos {} len {}",
res.pos,
res.item.len()
@@ -482,22 +471,23 @@ async fn position_file_for_evq_linear(file: File, evq: RawEventsQuery, _year: u3
}
let tslast = res.item.ts(res.item.len() - 1);
let diff = tslast as i64 - evq.range.beg as i64;
info!("position_file_for_evq_linear tslast {} diff {}", tslast, diff);
trace!("position_file_for_evq_linear tslast {} diff {}", tslast, diff);
if tslast >= evq.range.beg {
info!("FOUND curpos {}", curpos);
pbr.file().seek(SeekFrom::Start(curpos)).await?;
debug!("position_file_for_evq_linear Positioned curpos {}", curpos);
pbr.reset_io(curpos).await?;
let ret = PositionResult {
file: pbr.into_file(),
state: PositionState::Positioned(curpos),
pbr,
};
return Ok(ret);
}
}
None => {
info!("position_file_for_evq_linear NothingFound");
debug!("position_file_for_evq_linear NothingFound");
pbr.reset_io(0).await?;
let ret = PositionResult {
file: pbr.into_file(),
state: PositionState::NothingFound,
pbr,
};
return Ok(ret);
}
@@ -506,14 +496,15 @@ async fn position_file_for_evq_linear(file: File, evq: RawEventsQuery, _year: u3
}
async fn position_file_for_evq_binary(mut file: File, evq: RawEventsQuery, year: u32) -> Result<PositionResult, Error> {
info!("position_file_for_evq_binary");
debug!("position_file_for_evq_binary");
let flen = file.seek(SeekFrom::End(0)).await?;
file.seek(SeekFrom::Start(0)).await?;
let mut pbr = PbFileReader::new(file).await;
pbr.read_header().await?;
// TODO make read of header part of init:
let mut pbr = PbFileReader::new(file).await?;
let payload_type = pbr.payload_type().clone();
let res = pbr.read_msg().await?;
let mut file = pbr.into_file();
//let mut file = pbr.into_file();
let file = pbr.file();
let res = if let Some(res) = res {
res
} else {
@@ -546,21 +537,21 @@ async fn position_file_for_evq_binary(mut file: File, evq: RawEventsQuery, year:
let evs1 = parse_all_ts(p1 - 1, &buf1, payload_type.clone(), year)?;
let evs2 = parse_all_ts(p2, &buf2, payload_type.clone(), year)?;
info!("...............................................................");
info!("evs1.len() {:?}", evs1.len());
info!("evs2.len() {:?}", evs2.len());
info!("p1: {}", p1);
info!("p2: {}", p2);
debug!("...............................................................");
debug!("evs1.len() {:?}", evs1.len());
debug!("evs2.len() {:?}", evs2.len());
debug!("p1: {}", p1);
debug!("p2: {}", p2);
let tgt = evq.range.beg;
{
let ev = evs1.first().unwrap();
if ev.ts >= tgt {
file.seek(SeekFrom::Start(ev.pos)).await?;
pbr.reset_io(ev.pos).await?;
let ret = PositionResult {
state: PositionState::Positioned(ev.pos),
file,
pbr,
};
return Ok(ret);
}
@@ -568,10 +559,10 @@ async fn position_file_for_evq_binary(mut file: File, evq: RawEventsQuery, year:
{
let ev = evs2.last().unwrap();
if ev.ts < tgt {
file.seek(SeekFrom::Start(0)).await?;
pbr.reset_io(0).await?;
let ret = PositionResult {
state: PositionState::NothingFound,
file,
pbr,
};
return Ok(ret);
}
@@ -585,7 +576,7 @@ async fn position_file_for_evq_binary(mut file: File, evq: RawEventsQuery, year:
if p2 - p1 < 1024 * 128 {
// TODO switch here to linear search...
info!("switch to linear search in pos {}..{}", p1, p2);
return linear_search_2(file, evq, year, p1, p2, payload_type).await;
return linear_search_2(pbr, evq, year, p1, p2, payload_type).await;
}
let p3 = (p2 + p1) / 2;
file.seek(SeekFrom::Start(p3)).await?;
@@ -603,25 +594,27 @@ async fn position_file_for_evq_binary(mut file: File, evq: RawEventsQuery, year:
}
async fn linear_search_2(
mut file: File,
mut pbr: PbFileReader,
evq: RawEventsQuery,
year: u32,
p1: u64,
p2: u64,
payload_type: PayloadType,
) -> Result<PositionResult, Error> {
eprintln!("linear_search_2");
debug!("linear_search_2 begin");
// TODO improve.. either use additional file handle, or keep pbr in consistent state.
let file = pbr.file();
file.seek(SeekFrom::Start(p1 - 1)).await?;
let mut buf = vec![0; (p2 - p1) as usize];
file.read_exact(&mut buf).await?;
let evs1 = parse_all_ts(p1 - 1, &buf, payload_type.clone(), year)?;
for ev in evs1 {
if ev.ts >= evq.range.beg {
info!("FOUND {:?}", ev);
file.seek(SeekFrom::Start(ev.pos)).await?;
debug!("linear_search_2 Positioned {:?}", ev);
pbr.reset_io(ev.pos).await?;
let ret = PositionResult {
file,
state: PositionState::Positioned(ev.pos),
pbr,
};
return Ok(ret);
}
@@ -711,8 +704,7 @@ pub async fn channel_info(channel: &Channel, aa: &ArchiverAppliance) -> Result<C
if s.starts_with(&prefix) && s.ends_with(".pb") {
msgs.push(s);
let f1 = File::open(de.path()).await?;
let mut pbr = PbFileReader::new(f1).await;
pbr.read_header().await?;
let mut pbr = PbFileReader::new(f1).await?;
msgs.push(format!("got header {}", pbr.channel_name()));
let ev = pbr.read_msg().await;
match ev {

View File

@@ -18,12 +18,13 @@ use serde::Serialize;
use serde_json::Value as JsonValue;
use std::collections::{BTreeMap, VecDeque};
use std::fs::FileType;
use std::io::SeekFrom;
use std::mem;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
pub struct PbFileReader {
file: File,
@@ -96,8 +97,8 @@ pub struct ReadMessageResult {
}
impl PbFileReader {
pub async fn new(file: File) -> Self {
Self {
pub async fn new(file: File) -> Result<Self, Error> {
let mut ret = Self {
file,
buf: vec![0; MIN_BUF_FILL * 4],
escbuf: vec![],
@@ -110,7 +111,9 @@ impl PbFileReader {
channel_name: String::new(),
payload_type: PayloadType::V4_GENERIC_BYTES,
year: 0,
}
};
ret.read_header().await?;
Ok(ret)
}
pub fn into_file(self) -> File {
@@ -121,19 +124,18 @@ impl PbFileReader {
&mut self.file
}
pub fn reset_io(&mut self, off: u64) {
// TODO
// Should do the seek in here, or?
// Why do I need this anyway?
pub async fn reset_io(&mut self, off: u64) -> Result<(), Error> {
self.file().seek(SeekFrom::Start(off)).await?;
self.wp = 0;
self.rp = 0;
self.off = off;
Ok(())
}
pub async fn read_header(&mut self) -> Result<(), Error> {
self.fill_buf().await?;
let k = self.find_next_nl()?;
info!("read_header abspos {} packet len {}", self.abspos(), k + 1 - self.rp);
trace!("read_header abspos {} packet len {}", self.abspos(), k + 1 - self.rp);
let buf = &mut self.buf;
let m = unescape_archapp_msg(&buf[self.rp..k], mem::replace(&mut self.escbuf, vec![]))?;
self.escbuf = m;
@@ -151,7 +153,7 @@ impl PbFileReader {
let k = if let Ok(k) = self.find_next_nl() {
k
} else {
warn!("Can not find a next NL");
debug!("Can not find a next NL");
return Ok(None);
};
//info!("read_msg abspos {} packet len {}", self.abspos(), k + 1 - self.rp);
@@ -464,8 +466,7 @@ pub async fn scan_files_inner(
//tx.send(Ok(Box::new(serde_json::to_value(fns)?) as ItemSerBox)).await?;
let channel_path = &fns[proots.len() + 1..fns.len() - 11];
if !lru.query(channel_path) {
let mut pbr = PbFileReader::new(tokio::fs::File::open(&pe.path).await?).await;
pbr.read_header().await?;
let mut pbr = PbFileReader::new(tokio::fs::File::open(&pe.path).await?).await?;
let normalized_channel_name = {
let pvn = pbr.channel_name().replace("-", "/");
pvn.replace(":", "/")

View File

@@ -2,8 +2,7 @@ use crate::{FileChunkRead, NeedMinBuffer};
use bitshuffle::bitshuffle_decompress;
use bytes::{Buf, BytesMut};
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use futures_util::{Stream, StreamExt};
use items::{
Appendable, ByteEstimate, Clearable, PushableIndex, RangeCompletableItem, SitemtyFrameType, StatsItem, StreamItem,
WithLen, WithTimestamps,

View File

@@ -11,10 +11,14 @@ path = "src/dq.rs"
#serde = { version = "1.0", features = ["derive"] }
#serde_json = "1.0"
tokio = { version = "1.14.77", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
futures-util = "0.3.14"
clap = "3.0.0-beta.5"
chrono = "0.4.19"
bytes = "1.0.1"
err = { path = "../err" }
taskrun = { path = "../taskrun" }
netpod = { path = "../netpod" }
items = { path = "../items" }
parse = { path = "../parse" }
disk = { path = "../disk" }
archapp = { path = "../archapp" }

View File

@@ -1,24 +1,23 @@
// TODO crate `err` pulls in all other dependencies in order to implement From<...> for Error.
// Refactor that...
// Crate `taskrun` also depends on `err`...
use archapp::events::PositionState;
use archapp::parse::PbFileReader;
use bytes::BufMut;
use chrono::{TimeZone, Utc};
use clap::{crate_version, Parser};
use disk::eventchunker::EventChunkerConf;
use err::Error;
use items::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents};
use netpod::query::RawEventsQuery;
use netpod::timeunits::*;
use netpod::AggKind;
use netpod::Channel;
use netpod::NanoRange;
use netpod::{log::*, ByteOrder, ByteSize, ChannelConfig, HasScalarType, HasShape};
use netpod::{timeunits::*, Shape};
use netpod::{AggKind, Channel, NanoRange, Nanos, ScalarType};
use parse::channelconfig::Config;
use std::io::SeekFrom;
use std::mem::take;
use std::path::PathBuf;
use tokio::fs::File;
use tokio::io::AsyncSeekExt;
#[derive(Debug)]
pub struct Error2;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
#[derive(Debug, Parser)]
#[clap(name="DAQ buffer tools", version=crate_version!())]
@@ -31,51 +30,340 @@ pub struct Opts {
#[derive(Debug, Parser)]
pub enum SubCmd {
#[clap(about = "Convert a channel from the Archiver Appliance into Databuffer format.")]
ConvertArchiverApplianceChannel(ConvertArchiverApplianceChannel),
ReadDatabufferConfigfile(ReadDatabufferConfigfile),
ReadDatabufferDatafile(ReadDatabufferDatafile),
}
#[derive(Debug, Parser)]
pub struct ConvertArchiverApplianceChannel {
name: String,
#[clap(about = "Look for archiver appliance data at given path")]
#[clap(
long,
about = "Prefix for keyspaces, e.g. specify `daq` to get scalar keyspace directory `daq_2`"
)]
keyspace_prefix: String,
#[clap(long, about = "Name of the channel to convert")]
channel_name: String,
#[clap(long, about = "Look for archiver appliance data at given path")]
input_dir: PathBuf,
#[clap(about = "Generate Databuffer format at given path")]
#[clap(long, about = "Generate Databuffer format at given path")]
output_dir: PathBuf,
}
#[derive(Debug, Parser)]
pub struct ReadDatabufferConfigfile {
#[clap(long)]
configfile: PathBuf,
}
#[derive(Debug, Parser)]
pub struct ReadDatabufferDatafile {
#[clap(long)]
configfile: PathBuf,
#[clap(long)]
datafile: PathBuf,
}
trait WritableValue {
fn put_value(&self, buf: &mut Vec<u8>);
}
impl WritableValue for f32 {
fn put_value(&self, buf: &mut Vec<u8>) {
buf.put_f32(*self);
}
}
impl WritableValue for f64 {
fn put_value(&self, buf: &mut Vec<u8>) {
buf.put_f64(*self);
}
}
struct DataWriter {
output_dir: PathBuf,
kspre: String,
channel: Channel,
bs: Nanos,
tb: u64,
datafile: Option<File>,
indexfile: Option<File>,
wpos: u64,
buf1: Vec<u8>,
}
impl DataWriter {
async fn new(output_dir: PathBuf, kspre: String, channel: Channel, bs: Nanos) -> Result<Self, Error> {
let ret = Self {
output_dir,
kspre,
channel,
bs,
tb: u64::MAX,
datafile: None,
indexfile: None,
wpos: 0,
buf1: vec![0; 1024 * 1024],
};
Ok(ret)
}
async fn write_item(&mut self, item: &PlainEvents) -> Result<(), Error> {
match item {
PlainEvents::Scalar(item) => match item {
ScalarPlainEvents::Float(events) => {
self.write_events(2, ScalarType::F32, &events.tss, &events.values)
.await?;
}
ScalarPlainEvents::Double(events) => {
self.write_events(2, ScalarType::F64, &events.tss, &events.values)
.await?;
}
_ => todo!(),
},
PlainEvents::Wave(item) => match item {
WavePlainEvents::Double(_events) => {
todo!()
}
_ => todo!(),
},
}
Ok(())
}
async fn write_events<T: WritableValue>(
&mut self,
ks: u32,
scalar_type: ScalarType,
tss: &Vec<u64>,
vals: &Vec<T>,
) -> Result<(), Error> {
let split = 0;
assert_eq!(tss.len(), vals.len());
for i in 0..tss.len() {
let ts = tss[i];
let tb = ts / self.bs.ns;
if tb != self.tb {
let tbdate = chrono::Utc.timestamp((tb * (self.bs.ns / SEC)) as i64, 0);
eprintln!("Create directory for timebin {}", tbdate);
let p1 = self.output_dir.join(format!("{}_{}", self.kspre, ks));
let p2 = p1.join(self.channel.name());
let p3 = p2.join(format!("{:019}", tb));
let p4 = p3.join(format!("{:010}", split));
let p5 = p4.join(format!("{:019}_00000_Data", self.bs.ns / MS));
let p6 = p4.join(format!("{:019}_00000_Data_Index", self.bs.ns / MS));
tokio::fs::create_dir_all(&p4).await.map_err(|e| {
error!("Can not create {:?}", p4);
e
})?;
let mut file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&p5)
.await
.map_err(|e| {
error!("can not create new file {:?}", p5);
e
})?;
file.write_all(&0u16.to_be_bytes()).await?;
let chs = self.channel.name().as_bytes();
let len1 = (chs.len() + 8) as u32;
file.write_all(&len1.to_be_bytes()).await?;
file.write_all(chs).await?;
file.write_all(&len1.to_be_bytes()).await?;
self.wpos = 10 + chs.len() as u64;
self.datafile = Some(file);
if ks == 3 {
let mut file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&p6)
.await
.map_err(|e| {
error!("can not create new file {:?}", p6);
e
})?;
file.write_all(&0u16.to_be_bytes()).await?;
self.indexfile = Some(file);
}
self.tb = tb;
}
let file = self.datafile.as_mut().unwrap();
let mut buf = take(&mut self.buf1);
buf.clear();
buf.put_i32(0);
buf.put_u64(0);
buf.put_u64(ts);
buf.put_u64(0);
buf.put_u64(0);
// Status, Severity
buf.put_u8(0);
buf.put_u8(0);
buf.put_i32(-1);
let flags = 0;
buf.put_u8(flags);
buf.put_u8(scalar_type.index());
vals[i].put_value(&mut buf);
buf.put_i32(0);
let len1 = buf.len();
buf[0..4].as_mut().put_u32(len1 as u32);
buf[len1 - 4..len1].as_mut().put_u32(len1 as u32);
file.write_all(&buf).await?;
self.buf1 = buf;
if ks == 3 {
let file = self.indexfile.as_mut().unwrap();
let mut buf = take(&mut self.buf1);
buf.clear();
buf.put_u64(ts);
buf.put_u64(self.wpos);
file.write_all(&buf).await?;
self.buf1 = buf;
}
self.wpos += len1 as u64;
}
Ok(())
}
async fn write_config(&mut self, config: &Config) -> Result<(), Error> {
eprintln!("Create directory for channel config");
let p1 = self.output_dir.join("config").join(self.channel.name()).join("latest");
tokio::fs::create_dir_all(&p1).await.map_err(|e| {
error!("Can not create {:?}", p1);
e
})?;
let mut file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(p1.join("00000_Config"))
.await
.map_err(|e| {
error!("can not create config file in {:?}", p1);
e
})?;
let mut buf = take(&mut self.buf1);
{
buf.clear();
buf.put_u16(0);
file.write_all(&buf).await?;
}
{
buf.clear();
let chs = self.channel.name().as_bytes();
let len1 = (chs.len() + 8) as u32;
buf.put_u32(len1);
buf.put_slice(chs);
buf.put_u32(len1);
//let len1 = buf.len();
//buf[0..4].as_mut().put_u32(len1 as u32);
//buf[len1 - 4..len1].as_mut().put_u32(len1 as u32);
file.write_all(&buf).await?;
}
{
let e = &config.entries[0];
buf.clear();
buf.put_u32(0);
buf.put_u64(0);
buf.put_u64(0);
buf.put_i32(e.ks);
buf.put_u64(e.bs.ns / MS);
buf.put_i32(e.split_count);
buf.put_i32(e.status);
buf.put_i8(e.bb);
buf.put_i32(e.modulo);
buf.put_i32(e.offset);
buf.put_i16(e.precision);
let dtlen = 0;
buf.put_i32(dtlen);
let flags = 0;
buf.put_u8(flags);
buf.put_u8(e.scalar_type.index());
if false {
// is shaped?
buf.put_u8(1);
buf.put_u32(16);
}
buf.put_i32(-1);
buf.put_i32(-1);
buf.put_i32(-1);
buf.put_i32(-1);
buf.put_i32(-1);
buf.put_u32(0);
let len1 = buf.len();
buf[0..4].as_mut().put_u32(len1 as u32);
buf[len1 - 4..len1].as_mut().put_u32(len1 as u32);
file.write_all(&buf).await?;
}
self.buf1 = buf;
Ok(())
}
}
impl Drop for DataWriter {
fn drop(&mut self) {
let indexfile = self.indexfile.take();
let datafile = self.datafile.take();
tokio::task::spawn(async move {
match indexfile {
Some(mut file) => {
let _ = file.flush().await;
}
None => {}
}
match datafile {
Some(mut file) => {
let _ = file.flush().await;
}
None => {}
}
});
}
}
pub fn main() -> Result<(), Error> {
taskrun::run(async {
if false {
return Err(Error::with_msg_no_trace(format!("unknown command")));
}
let opts = Opts::parse();
eprintln!("Opts: {:?}", opts);
match opts.subcmd {
SubCmd::ConvertArchiverApplianceChannel(sub) => {
//
let _ = tokio::fs::create_dir(&sub.output_dir).await;
let meta = tokio::fs::metadata(&sub.output_dir).await?;
if !meta.is_dir() {
return Err(Error::from_string(format!(
"Given output path is not a directory: {:?}",
sub.output_dir
)));
}
let bs = Nanos::from_ns(DAY);
let mut channel_config: Option<Config> = None;
let channel = Channel {
backend: String::new(),
name: sub.name.into(),
name: sub.channel_name.into(),
};
let mut data_writer =
DataWriter::new(sub.output_dir, sub.keyspace_prefix.into(), channel.clone(), bs.clone()).await?;
let chandir = archapp::events::directory_for_channel_files(&channel, &sub.input_dir)?;
eprintln!("channel path: {:?}", chandir);
eprintln!("Looking for files in: {:?}", chandir);
let files = archapp::events::find_files_for_channel(&sub.input_dir, &channel).await?;
eprintln!("files: {:?}", files);
let mut evstot = 0;
for file in files {
eprintln!("try to open {:?}", file);
eprintln!("Try to open {:?}", file);
let fni = archapp::events::parse_data_filename(file.to_str().unwrap())?;
eprintln!("fni: {:?}", fni);
debug!("fni: {:?}", fni);
let ts0 = Utc.ymd(fni.year as i32, fni.month, 1).and_hms(0, 0, 0);
let ts1 = ts0.timestamp() as u64 * SEC + ts0.timestamp_subsec_nanos() as u64;
let _ = ts1;
let mut f1 = File::open(&file).await?;
let flen = f1.seek(SeekFrom::End(0)).await?;
eprintln!("flen: {}", flen);
let _flen = f1.seek(SeekFrom::End(0)).await?;
f1.seek(SeekFrom::Start(0)).await?;
let mut pbr = PbFileReader::new(f1).await;
pbr.read_header().await?;
eprintln!("✓ read header payload_type {:?}", pbr.payload_type());
let pbr = PbFileReader::new(f1).await?;
debug!("channel name in pbr file: {:?}", pbr.channel_name());
debug!("data type in file: {:?}", pbr.payload_type());
let evq = RawEventsQuery {
channel: channel.clone(),
range: NanoRange {
@@ -90,44 +378,230 @@ pub fn main() -> Result<(), Error> {
// TODO can the positioning-logic maybe re-use the pbr?
let z = archapp::events::position_file_for_evq(f1, evq.clone(), fni.year).await?;
if let PositionState::Positioned(pos) = z.state {
let mut f1 = z.file;
f1.seek(SeekFrom::Start(0)).await?;
let mut pbr = PbFileReader::new(f1).await;
// TODO incorporate the read_header into the init. must not be forgotten.
pbr.read_header().await?;
pbr.file().seek(SeekFrom::Start(pos)).await?;
pbr.reset_io(pos);
eprintln!("POSITIONED 1 at {}", pbr.abspos());
let mut pbr = z.pbr;
assert_eq!(pos, pbr.abspos());
let mut i1 = 0;
let mut repnext = u64::MAX;
loop {
match pbr.read_msg().await {
Ok(Some(ei)) => {
use items::{WithLen, WithTimestamps};
let ei = ei.item;
let tslast = if ei.len() > 0 { Some(ei.ts(ei.len() - 1)) } else { None };
i1 += 1;
if i1 < 20 {
eprintln!("read msg from file {} len {} tslast {:?}", i1, ei.len(), tslast);
if ei.is_wave() {
eprintln!("ERROR wave channels are not yet fully supported");
return Ok(());
}
if ei.len() > 0 {
let scalar_type = ei.scalar_type();
let shape = match &ei {
items::eventsitem::EventsItem::Plain(k) => match k.shape() {
Shape::Scalar => None,
Shape::Wave(n) => Some(vec![n]),
Shape::Image(..) => panic!(),
},
items::eventsitem::EventsItem::XBinnedEvents(_) => panic!(),
};
if let Some(conf) = &channel_config {
if scalar_type != conf.entries[0].scalar_type {
let msg = format!(
"unexpected type: {:?} vs {:?}",
scalar_type, conf.entries[0].scalar_type
);
return Err(Error::with_msg_no_trace(msg));
}
if shape != conf.entries[0].shape {
let msg = format!(
"unexpected shape: {:?} vs {:?}",
shape, conf.entries[0].shape
);
return Err(Error::with_msg_no_trace(msg));
}
}
if channel_config.is_none() {
let ks = if ei.is_wave() { 3 } else { 2 };
let scalar_type_2 = match &ei {
items::eventsitem::EventsItem::Plain(k) => match k {
PlainEvents::Scalar(k) => match k {
ScalarPlainEvents::Byte(_) => ScalarType::I8,
ScalarPlainEvents::Short(_) => ScalarType::I16,
ScalarPlainEvents::Int(_) => ScalarType::I32,
ScalarPlainEvents::Float(_) => ScalarType::F32,
ScalarPlainEvents::Double(_) => ScalarType::F64,
},
PlainEvents::Wave(k) => match k {
WavePlainEvents::Byte(_) => ScalarType::I8,
WavePlainEvents::Short(_) => ScalarType::I16,
WavePlainEvents::Int(_) => ScalarType::I32,
WavePlainEvents::Float(_) => ScalarType::F32,
WavePlainEvents::Double(_) => ScalarType::F64,
},
},
items::eventsitem::EventsItem::XBinnedEvents(_) => panic!(),
};
if scalar_type_2 != scalar_type {
let msg = format!(
"unexpected type: {:?} vs {:?}",
scalar_type_2, scalar_type
);
return Err(Error::with_msg_no_trace(msg));
}
let e = parse::channelconfig::ConfigEntry {
ts: 0,
pulse: 0,
ks,
bs: bs.clone(),
split_count: 1,
status: 0,
bb: 0,
modulo: 0,
offset: 0,
precision: 0,
scalar_type: scalar_type,
is_compressed: false,
is_shaped: false,
is_array: false,
byte_order: netpod::ByteOrder::LE,
compression_method: None,
shape,
source_name: None,
unit: None,
description: None,
optional_fields: None,
value_converter: None,
};
let k = parse::channelconfig::Config {
format_version: 0,
channel_name: channel.name().into(),
entries: vec![e],
};
channel_config = Some(k);
}
match &ei {
items::eventsitem::EventsItem::Plain(item) => {
data_writer.write_item(item).await?;
}
items::eventsitem::EventsItem::XBinnedEvents(_) => {
panic!()
}
}
}
let tslast = if ei.len() > 0 { Some(ei.ts(ei.len() - 1)) } else { None };
if i1 == repnext {
debug!("read msg from file {} len {} tslast {:?}", i1, ei.len(), tslast);
repnext = 1 + 4 * repnext / 3;
}
i1 += 1;
if false {
ei.x_aggregate(&evq.agg_kind);
}
//let ei2 = ei.x_aggregate(&evq.agg_kind);
}
Ok(None) => {
eprintln!("reached end of file");
debug!("reached end of file");
break;
}
Err(e) => {
eprintln!("error while reading msg {:?}", e);
error!("error while reading msg {:?}", e);
break;
}
}
}
eprintln!("read total {} events from file", i1);
debug!("read total {} events from the last file", i1);
evstot += i1;
} else {
eprintln!("Position fail.");
error!("Position fail.");
}
}
eprintln!("Total number of events converted: {}", evstot);
data_writer.write_config(channel_config.as_ref().unwrap()).await?;
Ok(())
}
SubCmd::ReadDatabufferConfigfile(sub) => {
let mut file = File::open(&sub.configfile).await?;
let meta = file.metadata().await?;
let mut buf = vec![0; meta.len() as usize];
file.read_exact(&mut buf).await?;
drop(file);
let config = match parse::channelconfig::parse_config(&buf) {
Ok(k) => k.1,
Err(e) => return Err(Error::with_msg_no_trace(format!("can not parse: {:?}", e))),
};
eprintln!("Read config: {:?}", config);
eprintln!("Config bs: {}", config.entries[0].bs.ns / MS);
Ok(())
}
SubCmd::ReadDatabufferDatafile(sub) => {
let mut file = File::open(&sub.configfile).await?;
let meta = file.metadata().await?;
let mut buf = vec![0; meta.len() as usize];
file.read_exact(&mut buf).await?;
drop(file);
let config = match parse::channelconfig::parse_config(&buf) {
Ok(k) => k.1,
Err(e) => return Err(Error::with_msg_no_trace(format!("can not parse: {:?}", e))),
};
let file = File::open(&sub.datafile).await?;
let inp = Box::pin(disk::file_content_stream(
file,
netpod::FileIoBufferSize::new(1024 * 16),
));
let ce = &config.entries[0];
let channel_config = ChannelConfig {
channel: Channel {
backend: String::new(),
name: config.channel_name.clone(),
},
keyspace: ce.ks as u8,
time_bin_size: ce.bs,
scalar_type: ce.scalar_type.clone(),
compression: false,
shape: Shape::Scalar,
array: false,
byte_order: ByteOrder::LE,
};
let range = NanoRange {
beg: u64::MIN,
end: u64::MAX,
};
let stats_conf = EventChunkerConf {
disk_stats_every: ByteSize::mb(2),
};
let max_ts = Arc::new(AtomicU64::new(0));
let mut chunks = disk::eventchunker::EventChunker::from_start(
inp,
channel_config.clone(),
range,
stats_conf,
sub.datafile.clone(),
max_ts.clone(),
false,
true,
);
use futures_util::stream::StreamExt;
use items::WithLen;
while let Some(item) = chunks.next().await {
let item = item?;
match item {
items::StreamItem::DataItem(item) => {
match item {
items::RangeCompletableItem::RangeComplete => {
warn!("RangeComplete");
}
items::RangeCompletableItem::Data(item) => {
info!("Data len {}", item.len());
info!("{:?}", item);
}
};
}
items::StreamItem::Log(k) => {
eprintln!("Log item {:?}", k);
}
items::StreamItem::Stats(k) => {
eprintln!("Stats item {:?}", k);
}
}
}
eprintln!("evstot {}", evstot);
Ok(())
}
}

View File

@@ -131,9 +131,13 @@ impl fmt::Debug for Error {
} else if let Some(s) = &self.trace_str {
s.into()
} else {
"NOTRACE".into()
String::new()
};
write!(fmt, "Error {}\nTrace:\n{}", self.msg, trace_str)
write!(fmt, "{}", self.msg)?;
if !trace_str.is_empty() {
write!(fmt, "\nTrace:\n{}", trace_str)?;
}
Ok(())
}
}

View File

@@ -41,7 +41,7 @@ pub struct BodyStream {
pub inner: Box<dyn futures_core::Stream<Item = Result<bytes::Bytes, Error>> + Send + Unpin>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
pub enum ScalarType {
U8,
U16,
@@ -461,7 +461,7 @@ pub struct ChannelConfig {
pub byte_order: ByteOrder,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
pub enum Shape {
Scalar,
Wave(u32),

View File

@@ -6,7 +6,7 @@ edition = "2018"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
tokio = { version = "1.14.77", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
chrono = { version = "0.4.19", features = ["serde"] }
bytes = "1.0.1"
byteorder = "1.4.3"

View File

@@ -95,10 +95,10 @@ pub struct ConfigEntry {
pub compression_method: Option<CompressionMethod>,
pub shape: Option<Vec<u32>>,
pub source_name: Option<String>,
unit: Option<String>,
description: Option<String>,
optional_fields: Option<String>,
value_converter: Option<String>,
pub unit: Option<String>,
pub description: Option<String>,
pub optional_fields: Option<String>,
pub value_converter: Option<String>,
}
impl ConfigEntry {
@@ -255,8 +255,8 @@ pub fn parse_entry(inp: &[u8]) -> NRes<Option<ConfigEntry>> {
))
}
/*
Parse the full configuration file.
/**
Parse a complete configuration file from given in-memory input buffer.
*/
pub fn parse_config(inp: &[u8]) -> NRes<Config> {
let (inp, ver) = be_i16(inp)?;

View File

@@ -68,7 +68,7 @@ pub fn run<T, F: std::future::Future<Output = Result<T, Error>>>(f: F) -> Result
match res {
Ok(k) => Ok(k),
Err(e) => {
error!("{:?}", e);
error!("Catched: {:?}", e);
Err(e)
}
}
@@ -107,6 +107,7 @@ pub fn tracing_init() {
"disk::binned=info",
"nodenet::conn=info",
"daqbuffer::test=info",
"dq=info",
]
.join(","),
))