From 0326aa795a8ddd64d2240b5db03b3d065cfae235 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 29 Jun 2021 17:19:31 +0200 Subject: [PATCH] Iterate through pb files and parse the header --- archapp/Cargo.toml | 1 + archapp/src/lib.rs | 192 +++++++++++++++++- archapp/src/test.rs | 25 +-- daqbuffer/Cargo.toml | 7 +- daqbuffer/src/bin/daqbuffer.rs | 10 +- daqbuffer/src/lib.rs | 35 ---- daqbufp2/Cargo.toml | 29 +++ {daqbuffer => daqbufp2}/src/client.rs | 0 daqbufp2/src/lib.rs | 34 ++++ {daqbuffer => daqbufp2}/src/nodes.rs | 1 + {daqbuffer => daqbufp2}/src/test.rs | 0 .../src/test/binnedbinary.rs | 0 .../src/test/binnedjson.rs | 0 {daqbuffer => daqbufp2}/src/test/events.rs | 0 disk/src/aggtest.rs | 1 + disk/src/gen.rs | 1 + httpret/Cargo.toml | 1 + httpret/src/lib.rs | 31 +++ netfetch/src/test.rs | 1 + netpod/src/lib.rs | 6 + 20 files changed, 307 insertions(+), 68 deletions(-) create mode 100644 daqbufp2/Cargo.toml rename {daqbuffer => daqbufp2}/src/client.rs (100%) create mode 100644 daqbufp2/src/lib.rs rename {daqbuffer => daqbufp2}/src/nodes.rs (97%) rename {daqbuffer => daqbufp2}/src/test.rs (100%) rename {daqbuffer => daqbufp2}/src/test/binnedbinary.rs (100%) rename {daqbuffer => daqbufp2}/src/test/binnedjson.rs (100%) rename {daqbuffer => daqbufp2}/src/test/events.rs (100%) diff --git a/archapp/Cargo.toml b/archapp/Cargo.toml index 3a6ee0a..9cf248c 100644 --- a/archapp/Cargo.toml +++ b/archapp/Cargo.toml @@ -15,6 +15,7 @@ serde_derive = "1.0" serde_json = "1.0" chrono = "0.4" protobuf = "2.24.1" +async-channel = "1.6" err = { path = "../err" } taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } diff --git a/archapp/src/lib.rs b/archapp/src/lib.rs index 4974496..1c9b7fe 100644 --- a/archapp/src/lib.rs +++ b/archapp/src/lib.rs @@ -1,4 +1,194 @@ -pub mod generated; +use async_channel::{bounded, Receiver}; +use err::Error; +use netpod::log::*; +use netpod::NodeConfigCached; +use protobuf::Message; +use serde::Serialize; +use serde_json::Value as JsonValue; +use std::collections::{BTreeMap, VecDeque}; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::io::AsyncReadExt; +pub mod generated; #[cfg(test)] mod test; + +pub trait ItemSer { + fn serialize(&self) -> Result, Error>; +} + +impl ItemSer for T +where + T: Serialize, +{ + fn serialize(&self) -> Result, Error> { + let u = serde_json::to_vec(self)?; + Ok(u) + } +} + +fn unescape_archapp_msg(inp: &[u8]) -> Result, Error> { + let mut ret = Vec::with_capacity(inp.len() * 5 / 4); + let mut esc = false; + for &k in inp.iter() { + if k == 0x1b { + esc = true; + } else if esc { + if k == 0x1 { + ret.push(0x1b); + } else if k == 0x2 { + ret.push(0xa); + } else if k == 0x3 { + ret.push(0xd); + } else { + return Err(Error::with_msg("malformed escaped archapp message")); + } + esc = false; + } else { + ret.push(k); + } + } + Ok(ret) +} + +#[derive(Serialize)] +pub struct EpicsEventPayloadInfo { + headers: Vec<(String, String)>, + year: i32, + pvname: String, +} + +async fn read_pb_file(path: PathBuf) -> Result { + let mut f1 = tokio::fs::File::open(path).await?; + let mut buf = vec![0; 1024 * 4]; + { + let mut i1 = 0; + loop { + let n = f1.read(&mut buf[i1..]).await?; + if n == 0 { + break; + } + i1 += n; + if i1 >= buf.len() { + break; + } + } + } + let mut j1 = 0; + loop { + let mut i2 = usize::MAX; + for (i1, &k) in buf[j1..].iter().enumerate() { + if k == 0xa { + i2 = j1 + i1; + break; + } + } + if i2 != usize::MAX { + //info!("got NL {} .. {}", j1, i2); + let m = unescape_archapp_msg(&buf[j1..i2])?; + if j1 == 0 { + let payload_info = crate::generated::EPICSEvent::PayloadInfo::parse_from_bytes(&m).unwrap(); + //info!("got payload_info: {:?}", payload_info); + let z = EpicsEventPayloadInfo { + headers: payload_info + .get_headers() + .iter() + .map(|j| (j.get_name().to_string(), j.get_val().to_string())) + .collect(), + year: payload_info.get_year(), + pvname: payload_info.get_pvname().into(), + }; + return Ok(z); + } else { + let scalar_double = crate::generated::EPICSEvent::ScalarDouble::parse_from_bytes(&m).unwrap(); + //info!("got scalar_double: {:?}", scalar_double); + } + } else { + //info!("no more packets"); + break; + } + j1 = i2 + 1; + } + Err(Error::with_msg(format!("no header entry found in file"))) +} + +type RT1 = Box; + +pub async fn scan_files( + _pairs: BTreeMap, + node_config: &NodeConfigCached, +) -> Result>, Error> { + let aa = if let Some(aa) = &node_config.node.archiver_appliance { + aa.clone() + } else { + return Err(Error::with_msg("no archiver appliance config")); + }; + let (tx, rx) = bounded(16); + let tx = Arc::new(tx); + let tx2 = tx.clone(); + let block1 = async move { + let mut paths = VecDeque::new(); + paths.push_back(aa.data_base_path); + loop { + if let Some(path) = paths.pop_back() { + let meta = tokio::fs::metadata(&path).await?; + if meta.is_dir() { + let mut rd = tokio::fs::read_dir(&path).await?; + loop { + match rd.next_entry().await { + Ok(item) => match item { + Some(item) => { + paths.push_back(item.path()); + } + None => { + break; + } + }, + Err(e) => { + tx.send(Err(e.into())).await.unwrap(); + } + } + } + } else if meta.is_file() { + //tx.send(Ok(Box::new(path.clone()) as RT1)).await?; + if path.to_str().unwrap().ends_with(".pb") { + let packet = read_pb_file(path.clone()).await?; + let pvn = packet.pvname.replace("-", "/"); + let pvn = pvn.replace(":", "/"); + let pre = "/arch/lts/ArchiverStore/"; + let p3 = &path.to_str().unwrap()[pre.len()..]; + let p3 = &p3[..p3.len() - 11]; + if p3 != pvn { + tx.send(Ok(Box::new(serde_json::to_value(&packet)?) as RT1)).await?; + { + let s = format!("{} - {}", p3, packet.pvname); + tx.send(Ok(Box::new(serde_json::to_value(&s)?) as RT1)).await?; + } + tx.send(Ok( + Box::new(JsonValue::String(format!("MISMATCH --------------------"))) as RT1, + )) + .await?; + } + } + } + } else { + break; + } + } + Ok::<_, Error>(()) + }; + let block2 = async move { + match block1.await { + Ok(_) => {} + Err(e) => match tx2.send(Err(e.into())).await { + Ok(_) => {} + Err(e) => { + error!("can not deliver error through channel: {:?}", e); + } + }, + } + }; + tokio::spawn(block2); + Ok(rx) +} diff --git a/archapp/src/test.rs b/archapp/src/test.rs index 4869dbe..97e1dbf 100644 --- a/archapp/src/test.rs +++ b/archapp/src/test.rs @@ -1,31 +1,8 @@ +use crate::unescape_archapp_msg; use err::Error; use netpod::log::*; use protobuf::Message; -fn unescape_archapp_msg(inp: &[u8]) -> Result, Error> { - let mut ret = Vec::with_capacity(inp.len() * 5 / 4); - let mut esc = false; - for &k in inp.iter() { - if k == 0x1b { - esc = true; - } else if esc { - if k == 0x1 { - ret.push(0x1b); - } else if k == 0x2 { - ret.push(0xa); - } else if k == 0x3 { - ret.push(0xd); - } else { - return Err(Error::with_msg("malformed escaped archapp message")); - } - esc = false; - } else { - ret.push(k); - } - } - Ok(ret) -} - #[test] fn read_pb_00() -> Result<(), Error> { let block1 = async move { diff --git a/daqbuffer/Cargo.toml b/daqbuffer/Cargo.toml index 296fa2d..6877efb 100644 --- a/daqbuffer/Cargo.toml +++ b/daqbuffer/Cargo.toml @@ -13,10 +13,10 @@ tracing-subscriber = "0.2.17" futures-core = "0.3.14" futures-util = "0.3.14" bytes = "1.0.1" -bincode = "1.3.3" +#bincode = "1.3.3" #async-channel = "1" #dashmap = "3" -tokio-postgres = "0.7" +#tokio-postgres = "0.7" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" @@ -27,5 +27,6 @@ lazy_static = "1.4.0" err = { path = "../err" } taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } -httpret = { path = "../httpret" } +#httpret = { path = "../httpret" } disk = { path = "../disk" } +daqbufp2 = { path = "../daqbufp2" } diff --git a/daqbuffer/src/bin/daqbuffer.rs b/daqbuffer/src/bin/daqbuffer.rs index a5d6159..f4b345b 100644 --- a/daqbuffer/src/bin/daqbuffer.rs +++ b/daqbuffer/src/bin/daqbuffer.rs @@ -59,7 +59,7 @@ async fn go() -> Result<(), Error> { let node_config: NodeConfig = serde_json::from_slice(&buf)?; let node_config: Result = node_config.into(); let node_config = node_config?; - daqbuffer::run_node(node_config.clone()).await?; + daqbufp2::run_node(node_config.clone()).await?; } SubCmd::Proxy(subcmd) => { info!("daqbuffer proxy {}", clap::crate_version!()); @@ -67,17 +67,17 @@ async fn go() -> Result<(), Error> { let mut buf = vec![]; config_file.read_to_end(&mut buf).await?; let proxy_config: ProxyConfig = serde_json::from_slice(&buf)?; - daqbuffer::run_proxy(proxy_config.clone()).await?; + daqbufp2::run_proxy(proxy_config.clone()).await?; } SubCmd::Client(client) => match client.client_type { ClientType::Status(opts) => { - daqbuffer::client::status(opts.host, opts.port).await?; + daqbufp2::client::status(opts.host, opts.port).await?; } ClientType::Binned(opts) => { let beg = parse_ts(&opts.beg)?; let end = parse_ts(&opts.end)?; let cache_usage = CacheUsage::from_string(&opts.cache)?; - daqbuffer::client::get_binned( + daqbufp2::client::get_binned( opts.host, opts.port, opts.backend, @@ -103,7 +103,7 @@ fn simple_fetch() { use netpod::Nanos; use netpod::{timeunits::*, ByteOrder, Channel, ChannelConfig, ScalarType, Shape}; taskrun::run(async { - let _rh = daqbuffer::nodes::require_test_hosts_running()?; + let _rh = daqbufp2::nodes::require_test_hosts_running()?; let t1 = chrono::Utc::now(); let query = netpod::AggQuerySingleChannel { channel_config: ChannelConfig { diff --git a/daqbuffer/src/lib.rs b/daqbuffer/src/lib.rs index fc03ce7..4f77372 100644 --- a/daqbuffer/src/lib.rs +++ b/daqbuffer/src/lib.rs @@ -1,36 +1 @@ -use err::Error; -use netpod::{Cluster, NodeConfig, NodeConfigCached, ProxyConfig}; -use tokio::task::JoinHandle; -#[allow(unused_imports)] -use tracing::{debug, error, info, trace, warn}; - pub mod cli; -pub mod client; -pub mod nodes; -#[cfg(test)] -pub mod test; - -pub fn spawn_test_hosts(cluster: Cluster) -> Vec>> { - let mut ret = vec![]; - for node in &cluster.nodes { - let node_config = NodeConfig { - cluster: cluster.clone(), - name: format!("{}:{}", node.host, node.port), - }; - let node_config: Result = node_config.into(); - let node_config = node_config.unwrap(); - let h = tokio::spawn(httpret::host(node_config)); - ret.push(h); - } - ret -} - -pub async fn run_node(node_config: NodeConfigCached) -> Result<(), Error> { - httpret::host(node_config).await?; - Ok(()) -} - -pub async fn run_proxy(proxy_config: ProxyConfig) -> Result<(), Error> { - httpret::proxy::proxy(proxy_config).await?; - Ok(()) -} diff --git a/daqbufp2/Cargo.toml b/daqbufp2/Cargo.toml new file mode 100644 index 0000000..a287c54 --- /dev/null +++ b/daqbufp2/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "daqbufp2" +version = "0.0.1-a.dev.12" +authors = ["Dominik Werder "] +edition = "2018" + +[dependencies] +tokio = { version = "1.4.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +hyper = "0.14" +http = "0.2" +tracing = "0.1.25" +tracing-subscriber = "0.2.17" +futures-core = "0.3.14" +futures-util = "0.3.14" +bytes = "1.0.1" +bincode = "1.3.3" +#async-channel = "1" +#dashmap = "3" +serde = "1.0" +serde_derive = "1.0" +serde_json = "1.0" +chrono = "0.4" +url = "2.2.2" +lazy_static = "1.4.0" +err = { path = "../err" } +taskrun = { path = "../taskrun" } +netpod = { path = "../netpod" } +httpret = { path = "../httpret" } +disk = { path = "../disk" } diff --git a/daqbuffer/src/client.rs b/daqbufp2/src/client.rs similarity index 100% rename from daqbuffer/src/client.rs rename to daqbufp2/src/client.rs diff --git a/daqbufp2/src/lib.rs b/daqbufp2/src/lib.rs new file mode 100644 index 0000000..5b08722 --- /dev/null +++ b/daqbufp2/src/lib.rs @@ -0,0 +1,34 @@ +use tokio::task::JoinHandle; + +use err::Error; +use netpod::{Cluster, NodeConfig, NodeConfigCached, ProxyConfig}; + +pub mod client; +pub mod nodes; +#[cfg(test)] +pub mod test; + +pub fn spawn_test_hosts(cluster: Cluster) -> Vec>> { + let mut ret = vec![]; + for node in &cluster.nodes { + let node_config = NodeConfig { + cluster: cluster.clone(), + name: format!("{}:{}", node.host, node.port), + }; + let node_config: Result = node_config.into(); + let node_config = node_config.unwrap(); + let h = tokio::spawn(httpret::host(node_config)); + ret.push(h); + } + ret +} + +pub async fn run_node(node_config: NodeConfigCached) -> Result<(), Error> { + httpret::host(node_config).await?; + Ok(()) +} + +pub async fn run_proxy(proxy_config: ProxyConfig) -> Result<(), Error> { + httpret::proxy::proxy(proxy_config).await?; + Ok(()) +} diff --git a/daqbuffer/src/nodes.rs b/daqbufp2/src/nodes.rs similarity index 97% rename from daqbuffer/src/nodes.rs rename to daqbufp2/src/nodes.rs index 7afb5bd..4c15fa0 100644 --- a/daqbuffer/src/nodes.rs +++ b/daqbufp2/src/nodes.rs @@ -44,6 +44,7 @@ fn test_cluster() -> Cluster { split: id, backend: "testbackend".into(), bin_grain_kind: 0, + archiver_appliance: None, }) .collect(); Cluster { diff --git a/daqbuffer/src/test.rs b/daqbufp2/src/test.rs similarity index 100% rename from daqbuffer/src/test.rs rename to daqbufp2/src/test.rs diff --git a/daqbuffer/src/test/binnedbinary.rs b/daqbufp2/src/test/binnedbinary.rs similarity index 100% rename from daqbuffer/src/test/binnedbinary.rs rename to daqbufp2/src/test/binnedbinary.rs diff --git a/daqbuffer/src/test/binnedjson.rs b/daqbufp2/src/test/binnedjson.rs similarity index 100% rename from daqbuffer/src/test/binnedjson.rs rename to daqbufp2/src/test/binnedjson.rs diff --git a/daqbuffer/src/test/events.rs b/daqbufp2/src/test/events.rs similarity index 100% rename from daqbuffer/src/test/events.rs rename to daqbufp2/src/test/events.rs diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index e720281..6857d38 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -16,6 +16,7 @@ pub fn make_test_node(id: u32) -> Node { ksprefix: "ks".into(), backend: "testbackend".into(), bin_grain_kind: 0, + archiver_appliance: None, } } diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 71f6ab8..8fccfa3 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -81,6 +81,7 @@ pub async fn gen_test_data() -> Result<(), Error> { ksprefix: ksprefix.clone(), backend: "testbackend".into(), bin_grain_kind: 0, + archiver_appliance: None, }; ensemble.nodes.push(node); } diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index 8aa5ffb..33b5ccf 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -24,4 +24,5 @@ dbconn = { path = "../dbconn" } disk = { path = "../disk" } parse = { path = "../parse" } netfetch = { path = "../netfetch" } +archapp = { path = "../archapp" } taskrun = { path = "../taskrun" } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index ba9c748..4e5f8ef 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -217,6 +217,12 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } + } else if path == "/api/4/archapp/files" { + if req.method() == Method::GET { + Ok(archapp_scan_files(req, &node_config).await?) + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } } else if path == "/api/4/channel/config" { if req.method() == Method::GET { Ok(channel_config(req, &node_config).await?) @@ -624,3 +630,28 @@ pub async fn ca_connect_1(req: Request, node_config: &NodeConfigCached) -> })))?; Ok(ret) } + +pub async fn archapp_scan_files(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let pairs = get_url_query_pairs(&url); + let res = archapp::scan_files(pairs, node_config).await?; + let ret = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, APP_JSON_LINES) + .body(Body::wrap_stream(res.map(|k| match k { + Ok(k) => match k.serialize() { + Ok(mut item) => { + item.push(0xa); + Ok(item) + } + Err(e) => Err(e), + }, + Err(e) => match serde_json::to_vec(&e) { + Ok(mut item) => { + item.push(0xa); + Ok(item) + } + Err(e) => Err(e.into()), + }, + })))?; + Ok(ret) +} diff --git a/netfetch/src/test.rs b/netfetch/src/test.rs index 7a8198c..b682e85 100644 --- a/netfetch/src/test.rs +++ b/netfetch/src/test.rs @@ -20,6 +20,7 @@ fn ca_connect_1() { data_base_path: "".into(), listen: "".into(), ksprefix: "".into(), + archiver_appliance: None, }, node_config: NodeConfig { name: "".into(), diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index f42ff5a..42b738c 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -165,6 +165,11 @@ impl ScalarType { } } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ArchiverAppliance { + pub data_base_path: PathBuf, +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Node { pub host: String, @@ -177,6 +182,7 @@ pub struct Node { pub backend: String, #[serde(default)] pub bin_grain_kind: u32, + pub archiver_appliance: Option, } #[derive(Clone, Debug, Serialize, Deserialize)]