From 96aa6576de4151ff4996117d5a2ca74818026062 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 30 Jun 2021 11:14:32 +0200 Subject: [PATCH] Insert archiver appliance channel names into db, make this optional --- .cargo/config.toml | 9 ++- Cargo.toml | 1 + archapp/Cargo.toml | 5 ++ archapp/src/lib.rs | 177 ++++++------------------------------------- archapp/src/parse.rs | 169 +++++++++++++++++++++++++++++++++++++++++ archapp/src/test.rs | 5 ++ dbconn/src/lib.rs | 15 ++++ dbconn/src/scan.rs | 19 +++-- httpret/src/lib.rs | 2 +- 9 files changed, 242 insertions(+), 160 deletions(-) create mode 100644 archapp/src/parse.rs diff --git a/.cargo/config.toml b/.cargo/config.toml index a00eda8..6994f47 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,2 +1,9 @@ [build] -#rustflags = ["-C", "force-frame-pointers"] +rustflags = [ + "-C", "force-frame-pointers=yes", + "-C", "force-unwind-tables=yes", + "-C", "embed-bitcode=no", + "-C", "relocation-model=pic", + #"-Z", "time-passes=yes", + #"-Z", "time-llvm-passes=yes", +] diff --git a/Cargo.toml b/Cargo.toml index 7e53bf0..4b0d914 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,3 +6,4 @@ debug = 1 opt-level = 2 #overflow-checks = true #debug-assertions = true +lto = false diff --git a/archapp/Cargo.toml b/archapp/Cargo.toml index 9cf248c..81b34db 100644 --- a/archapp/Cargo.toml +++ b/archapp/Cargo.toml @@ -19,3 +19,8 @@ async-channel = "1.6" err = { path = "../err" } taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } +dbconn = { path = "../dbconn" } + +[features] +default = [] +devread = [] \ No newline at end of file diff --git a/archapp/src/lib.rs b/archapp/src/lib.rs index 1c9b7fe..d790fd0 100644 --- a/archapp/src/lib.rs +++ b/archapp/src/lib.rs @@ -1,18 +1,32 @@ -use async_channel::{bounded, Receiver}; use err::Error; -use netpod::log::*; -use netpod::NodeConfigCached; -use protobuf::Message; use serde::Serialize; -use serde_json::Value as JsonValue; -use std::collections::{BTreeMap, VecDeque}; -use std::path::PathBuf; -use std::sync::Arc; -use tokio::io::AsyncReadExt; +#[cfg(feature = "devread")] pub mod generated; +#[cfg(not(feature = "devread"))] +pub mod generated {} +#[cfg(feature = "devread")] +pub mod parse; +#[cfg(not(feature = "devread"))] +pub mod parse { + use crate::ItemSer; + use async_channel::Receiver; + use err::Error; + use netpod::NodeConfigCached; + use std::collections::BTreeMap; + + type RT1 = Box; + + pub async fn scan_files( + _pairs: BTreeMap, + _node_config: NodeConfigCached, + ) -> Result>, Error> { + Err(Error::with_msg("feature not enabled")) + } +} +#[cfg(feature = "devread")] #[cfg(test)] -mod test; +pub mod test; pub trait ItemSer { fn serialize(&self) -> Result, Error>; @@ -28,7 +42,7 @@ where } } -fn unescape_archapp_msg(inp: &[u8]) -> Result, Error> { +pub fn unescape_archapp_msg(inp: &[u8]) -> Result, Error> { let mut ret = Vec::with_capacity(inp.len() * 5 / 4); let mut esc = false; for &k in inp.iter() { @@ -51,144 +65,3 @@ fn unescape_archapp_msg(inp: &[u8]) -> Result, Error> { } Ok(ret) } - -#[derive(Serialize)] -pub struct EpicsEventPayloadInfo { - headers: Vec<(String, String)>, - year: i32, - pvname: String, -} - -async fn read_pb_file(path: PathBuf) -> Result { - let mut f1 = tokio::fs::File::open(path).await?; - let mut buf = vec![0; 1024 * 4]; - { - let mut i1 = 0; - loop { - let n = f1.read(&mut buf[i1..]).await?; - if n == 0 { - break; - } - i1 += n; - if i1 >= buf.len() { - break; - } - } - } - let mut j1 = 0; - loop { - let mut i2 = usize::MAX; - for (i1, &k) in buf[j1..].iter().enumerate() { - if k == 0xa { - i2 = j1 + i1; - break; - } - } - if i2 != usize::MAX { - //info!("got NL {} .. {}", j1, i2); - let m = unescape_archapp_msg(&buf[j1..i2])?; - if j1 == 0 { - let payload_info = crate::generated::EPICSEvent::PayloadInfo::parse_from_bytes(&m).unwrap(); - //info!("got payload_info: {:?}", payload_info); - let z = EpicsEventPayloadInfo { - headers: payload_info - .get_headers() - .iter() - .map(|j| (j.get_name().to_string(), j.get_val().to_string())) - .collect(), - year: payload_info.get_year(), - pvname: payload_info.get_pvname().into(), - }; - return Ok(z); - } else { - let scalar_double = crate::generated::EPICSEvent::ScalarDouble::parse_from_bytes(&m).unwrap(); - //info!("got scalar_double: {:?}", scalar_double); - } - } else { - //info!("no more packets"); - break; - } - j1 = i2 + 1; - } - Err(Error::with_msg(format!("no header entry found in file"))) -} - -type RT1 = Box; - -pub async fn scan_files( - _pairs: BTreeMap, - node_config: &NodeConfigCached, -) -> Result>, Error> { - let aa = if let Some(aa) = &node_config.node.archiver_appliance { - aa.clone() - } else { - return Err(Error::with_msg("no archiver appliance config")); - }; - let (tx, rx) = bounded(16); - let tx = Arc::new(tx); - let tx2 = tx.clone(); - let block1 = async move { - let mut paths = VecDeque::new(); - paths.push_back(aa.data_base_path); - loop { - if let Some(path) = paths.pop_back() { - let meta = tokio::fs::metadata(&path).await?; - if meta.is_dir() { - let mut rd = tokio::fs::read_dir(&path).await?; - loop { - match rd.next_entry().await { - Ok(item) => match item { - Some(item) => { - paths.push_back(item.path()); - } - None => { - break; - } - }, - Err(e) => { - tx.send(Err(e.into())).await.unwrap(); - } - } - } - } else if meta.is_file() { - //tx.send(Ok(Box::new(path.clone()) as RT1)).await?; - if path.to_str().unwrap().ends_with(".pb") { - let packet = read_pb_file(path.clone()).await?; - let pvn = packet.pvname.replace("-", "/"); - let pvn = pvn.replace(":", "/"); - let pre = "/arch/lts/ArchiverStore/"; - let p3 = &path.to_str().unwrap()[pre.len()..]; - let p3 = &p3[..p3.len() - 11]; - if p3 != pvn { - tx.send(Ok(Box::new(serde_json::to_value(&packet)?) as RT1)).await?; - { - let s = format!("{} - {}", p3, packet.pvname); - tx.send(Ok(Box::new(serde_json::to_value(&s)?) as RT1)).await?; - } - tx.send(Ok( - Box::new(JsonValue::String(format!("MISMATCH --------------------"))) as RT1, - )) - .await?; - } - } - } - } else { - break; - } - } - Ok::<_, Error>(()) - }; - let block2 = async move { - match block1.await { - Ok(_) => {} - Err(e) => match tx2.send(Err(e.into())).await { - Ok(_) => {} - Err(e) => { - error!("can not deliver error through channel: {:?}", e); - } - }, - } - }; - tokio::spawn(block2); - Ok(rx) -} diff --git a/archapp/src/parse.rs b/archapp/src/parse.rs new file mode 100644 index 0000000..6972b28 --- /dev/null +++ b/archapp/src/parse.rs @@ -0,0 +1,169 @@ +use crate::{unescape_archapp_msg, ItemSer}; +use async_channel::{bounded, Receiver}; +use err::Error; +use netpod::log::*; +use netpod::NodeConfigCached; +use protobuf::Message; +use serde::Serialize; +use serde_json::Value as JsonValue; +use std::collections::{BTreeMap, VecDeque}; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::io::AsyncReadExt; + +#[derive(Serialize)] +pub struct EpicsEventPayloadInfo { + headers: Vec<(String, String)>, + year: i32, + pvname: String, +} + +async fn read_pb_file(path: PathBuf) -> Result { + let mut f1 = tokio::fs::File::open(path).await?; + let mut buf = vec![0; 1024 * 4]; + { + let mut i1 = 0; + loop { + let n = f1.read(&mut buf[i1..]).await?; + if n == 0 { + break; + } + i1 += n; + if i1 >= buf.len() { + break; + } + } + } + let mut j1 = 0; + loop { + let mut i2 = usize::MAX; + for (i1, &k) in buf[j1..].iter().enumerate() { + if k == 0xa { + i2 = j1 + i1; + break; + } + } + if i2 != usize::MAX { + //info!("got NL {} .. {}", j1, i2); + let m = unescape_archapp_msg(&buf[j1..i2])?; + if j1 == 0 { + let payload_info = crate::generated::EPICSEvent::PayloadInfo::parse_from_bytes(&m) + .map_err(|_| Error::with_msg("can not parse PayloadInfo"))?; + //info!("got payload_info: {:?}", payload_info); + let z = EpicsEventPayloadInfo { + headers: payload_info + .get_headers() + .iter() + .map(|j| (j.get_name().to_string(), j.get_val().to_string())) + .collect(), + year: payload_info.get_year(), + pvname: payload_info.get_pvname().into(), + }; + return Ok(z); + } else { + let _scalar_double = crate::generated::EPICSEvent::ScalarDouble::parse_from_bytes(&m) + .map_err(|_| Error::with_msg("can not parse EPICSEvent::ScalarDouble"))?; + //info!("got scalar_double: {:?}", scalar_double); + } + } else { + //info!("no more packets"); + break; + } + j1 = i2 + 1; + } + Err(Error::with_msg(format!("no header entry found in file"))) +} + +type RT1 = Box; + +pub async fn scan_files( + pairs: BTreeMap, + node_config: NodeConfigCached, +) -> Result>, Error> { + let (tx, rx) = bounded(16); + let tx = Arc::new(tx); + let tx2 = tx.clone(); + let block1 = async move { + let aa = if let Some(aa) = &node_config.node.archiver_appliance { + aa.clone() + } else { + return Err(Error::with_msg("no archiver appliance config")); + }; + let dbc = dbconn::create_connection(&node_config.node_config.cluster.database).await?; + let ndi = dbconn::scan::get_node_disk_ident(&node_config, &dbc).await?; + let mut paths = VecDeque::new(); + paths.push_back( + aa.data_base_path.join( + pairs + .get("subpath") + .ok_or_else(|| Error::with_msg("subpatch not given"))?, + ), + ); + loop { + if let Some(path) = paths.pop_back() { + let meta = tokio::fs::metadata(&path).await?; + if meta.is_dir() { + let mut rd = tokio::fs::read_dir(&path).await?; + loop { + match rd.next_entry().await { + Ok(item) => match item { + Some(item) => { + paths.push_back(item.path()); + } + None => { + break; + } + }, + Err(e) => { + tx.send(Err(e.into())).await?; + } + } + } + } else if meta.is_file() { + tx.send(Ok(Box::new(path.clone()) as RT1)).await?; + if path + .to_str() + .ok_or_else(|| Error::with_msg("invalid path string"))? + .ends_with(".pb") + { + let packet = read_pb_file(path.clone()).await?; + let pvn = packet.pvname.replace("-", "/"); + let pvn = pvn.replace(":", "/"); + let pre = "/arch/lts/ArchiverStore/"; + let p3 = &path.to_str().unwrap()[pre.len()..]; + let p3 = &p3[..p3.len() - 11]; + if p3 != pvn { + tx.send(Ok(Box::new(serde_json::to_value(&packet)?) as RT1)).await?; + { + let s = format!("{} - {}", p3, packet.pvname); + tx.send(Ok(Box::new(serde_json::to_value(&s)?) as RT1)).await?; + } + tx.send(Ok( + Box::new(JsonValue::String(format!("MISMATCH --------------------"))) as RT1, + )) + .await?; + } else { + dbconn::insert_channel(packet.pvname.clone(), ndi.facility, &dbc).await?; + } + } + } + } else { + break; + } + } + Ok::<_, Error>(()) + }; + let block2 = async move { + match block1.await { + Ok(_) => {} + Err(e) => match tx2.send(Err(e)).await { + Ok(_) => {} + Err(e) => { + error!("can not deliver error through channel: {:?}", e); + } + }, + } + }; + tokio::spawn(block2); + Ok(rx) +} diff --git a/archapp/src/test.rs b/archapp/src/test.rs index 97e1dbf..69c3b03 100644 --- a/archapp/src/test.rs +++ b/archapp/src/test.rs @@ -3,6 +3,11 @@ use err::Error; use netpod::log::*; use protobuf::Message; +pub fn read_pb_dummy() -> Result<(), Error> { + Ok(()) +} + +#[cfg(feature = "devread")] #[test] fn read_pb_00() -> Result<(), Error> { let block1 = async move { diff --git a/dbconn/src/lib.rs b/dbconn/src/lib.rs index 8ca1375..fe598db 100644 --- a/dbconn/src/lib.rs +++ b/dbconn/src/lib.rs @@ -101,3 +101,18 @@ pub async fn random_channel(node_config: &NodeConfigCached) -> Result Result<(), Error> { + let rows = dbc + .query( + "select count(rowid) from channels where facility = $1 and name = $2", + &[&facility, &name], + ) + .await?; + if rows[0].get::<_, i64>(0) == 0 { + let sql = + concat!("insert into channels (facility, name) values ($1, $2) on conflict (facility, name) do nothing"); + dbc.query(sql, &[&facility, &name]).await?; + } + Ok(()) +} diff --git a/dbconn/src/scan.rs b/dbconn/src/scan.rs index c3dc20e..926d2f5 100644 --- a/dbconn/src/scan.rs +++ b/dbconn/src/scan.rs @@ -42,7 +42,10 @@ fn _get_hostname() -> Result { } pub async fn get_node_disk_ident(node_config: &NodeConfigCached, dbc: &Client) -> Result { - let rows = dbc.query("select nodes.rowid, facility, split, hostname from nodes, facilities where facilities.name = $1 and facility = facilities.rowid and hostname = $2", &[&node_config.node.backend, &node_config.node.host]).await?; + let sql = "select nodes.rowid, facility, split, hostname from nodes, facilities where facilities.name = $1 and facility = facilities.rowid and hostname = $2"; + let rows = dbc + .query(sql, &[&node_config.node.backend, &node_config.node.host]) + .await?; if rows.len() != 1 { return Err(Error::with_msg(format!( "get_node can't find unique entry for {} {}", @@ -62,7 +65,10 @@ pub async fn get_node_disk_ident_2( node_config: Pin<&NodeConfigCached>, dbc: Pin<&Client>, ) -> Result { - let rows = dbc.query("select nodes.rowid, facility, split, hostname from nodes, facilities where facilities.name = $1 and facility = facilities.rowid and hostname = $2", &[&node_config.node.backend, &node_config.node.host]).await?; + let sql = "select nodes.rowid, facility, split, hostname from nodes, facilities where facilities.name = $1 and facility = facilities.rowid and hostname = $2"; + let rows = dbc + .query(sql, &[&node_config.node.backend, &node_config.node.host]) + .await?; if rows.len() != 1 { return Err(Error::with_msg(format!( "get_node can't find unique entry for {} {}", @@ -571,10 +577,11 @@ pub async fn update_db_with_channel_config( let parsed_until: u32 = row.get::<_, i64>(2) as u32; let _channel_id = row.get::<_, i64>(2) as i64; if meta.len() < file_size as u64 || meta.len() < parsed_until as u64 { - dbc.query( - "insert into configs_history (rowid_original, node, channel, fileSize, parsedUntil, config, tsinsert) select rowid as rowid_original, node, channel, fileSize, parsedUntil, config, now() from configs where rowid = $1", - &[&rowid], - ).await?; + let sql = concat!( + "insert into configs_history (rowid_original, node, channel, fileSize, parsedUntil, config, tsinsert) ", + "select rowid as rowid_original, node, channel, fileSize, parsedUntil, config, now() from configs where rowid = $1" + ); + dbc.query(sql, &[&rowid]).await?; } //ensure!(meta.len() >= parsed_until as u64, ConfigFileOnDiskShrunk{path}); (Some(rowid), true) diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 4e5f8ef..3ac42e9 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -634,7 +634,7 @@ pub async fn ca_connect_1(req: Request, node_config: &NodeConfigCached) -> pub async fn archapp_scan_files(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let url = Url::parse(&format!("dummy:{}", req.uri()))?; let pairs = get_url_query_pairs(&url); - let res = archapp::scan_files(pairs, node_config).await?; + let res = archapp::parse::scan_files(pairs, node_config.clone()).await?; let ret = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, APP_JSON_LINES) .body(Body::wrap_stream(res.map(|k| match k {