From 4ed787d3a7a09ef73fd7090d8c0e8611fb2e5b04 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 28 Jun 2021 21:45:19 +0200 Subject: [PATCH] Read epics archiver appliance protobuf file --- Cargo.toml | 2 +- archapp/Cargo.toml | 20 +++++++++++++ archapp/src/generated.rs | 2 ++ archapp/src/lib.rs | 4 +++ archapp/src/test.rs | 63 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 archapp/Cargo.toml create mode 100644 archapp/src/generated.rs create mode 100644 archapp/src/lib.rs create mode 100644 archapp/src/test.rs diff --git a/Cargo.toml b/Cargo.toml index 1747421..7e53bf0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["daqbuffer", "h5out"] +members = ["daqbuffer", "h5out", "archapp"] [profile.release] debug = 1 diff --git a/archapp/Cargo.toml b/archapp/Cargo.toml new file mode 100644 index 0000000..3a6ee0a --- /dev/null +++ b/archapp/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "archapp" +version = "0.0.1-a.dev.4" +authors = ["Dominik Werder "] +edition = "2018" + +[dependencies] +tokio = { version = "1.4.0", features = ["io-util", "net", "time", "sync", "fs"] } +tracing = "0.1.25" +futures-core = "0.3.14" +futures-util = "0.3.14" +bytes = "1.0.1" +serde = "1.0" +serde_derive = "1.0" +serde_json = "1.0" +chrono = "0.4" +protobuf = "2.24.1" +err = { path = "../err" } +taskrun = { path = "../taskrun" } +netpod = { path = "../netpod" } diff --git a/archapp/src/generated.rs b/archapp/src/generated.rs new file mode 100644 index 0000000..926d9e2 --- /dev/null +++ b/archapp/src/generated.rs @@ -0,0 +1,2 @@ +#[allow(non_snake_case)] +pub mod EPICSEvent; diff --git a/archapp/src/lib.rs b/archapp/src/lib.rs new file mode 100644 index 0000000..4974496 --- /dev/null +++ b/archapp/src/lib.rs @@ -0,0 +1,4 @@ +pub mod generated; + +#[cfg(test)] +mod test; diff --git a/archapp/src/test.rs b/archapp/src/test.rs new file mode 100644 index 0000000..4869dbe --- /dev/null +++ b/archapp/src/test.rs @@ -0,0 +1,63 @@ +use err::Error; +use netpod::log::*; +use protobuf::Message; + +fn unescape_archapp_msg(inp: &[u8]) -> Result, Error> { + let mut ret = Vec::with_capacity(inp.len() * 5 / 4); + let mut esc = false; + for &k in inp.iter() { + if k == 0x1b { + esc = true; + } else if esc { + if k == 0x1 { + ret.push(0x1b); + } else if k == 0x2 { + ret.push(0xa); + } else if k == 0x3 { + ret.push(0xd); + } else { + return Err(Error::with_msg("malformed escaped archapp message")); + } + esc = false; + } else { + ret.push(k); + } + } + Ok(ret) +} + +#[test] +fn read_pb_00() -> Result<(), Error> { + let block1 = async move { + let path = "../../../../archappdata/tmp/lts/ArchiverStore/SARUN16/MQUA080/X:2021_01.pb"; + let f1 = tokio::fs::read(path).await?; + let mut j1 = 0; + loop { + let mut i2 = usize::MAX; + for (i1, &k) in f1[j1..].iter().enumerate() { + if k == 0xa { + i2 = j1 + i1; + break; + } + } + if i2 != usize::MAX { + info!("got NL {} .. {}", j1, i2); + let m = unescape_archapp_msg(&f1[j1..i2])?; + if j1 == 0 { + let payload_info = crate::generated::EPICSEvent::PayloadInfo::parse_from_bytes(&m).unwrap(); + info!("got payload_info: {:?}", payload_info); + } else { + let scalar_double = crate::generated::EPICSEvent::ScalarDouble::parse_from_bytes(&m).unwrap(); + info!("got scalar_double: {:?}", scalar_double); + } + } else { + info!("no more packets"); + break; + } + j1 = i2 + 1; + } + Ok::<_, Error>(()) + }; + taskrun::run(block1)?; + Ok(()) +}