From b82a6292d2e93ca22f4542dbb9cceded5a56745f Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 18 Feb 2022 09:33:38 +0100 Subject: [PATCH] Scan files and insert channel name into database --- archapp/src/parse.rs | 152 ++++++++++++++++++++++++++------- archapp_wrap/src/lib.rs | 14 ++- daqbuffer/src/bin/daqbuffer.rs | 2 - daqbufp2/src/test/archapp.rs | 3 +- httpret/src/httpret.rs | 36 +++++++- netpod/src/netpod.rs | 19 +++++ 6 files changed, 189 insertions(+), 37 deletions(-) diff --git a/archapp/src/parse.rs b/archapp/src/parse.rs index b5a08e7..5ccbed6 100644 --- a/archapp/src/parse.rs +++ b/archapp/src/parse.rs @@ -7,15 +7,16 @@ use archapp_xc::*; use async_channel::{bounded, Receiver}; use chrono::{TimeZone, Utc}; use err::{ErrStr, Error}; +use futures_util::StreamExt; use items::eventsitem::EventsItem; use items::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents}; use items::scalarevents::ScalarEvents; use items::waveevents::WaveEvents; use netpod::log::*; use netpod::{ArchiverAppliance, ChannelConfigQuery, ChannelConfigResponse}; +use netpod::{Database, ScalarType, Shape}; use protobuf::Message; use serde::Serialize; -use serde_json::Value as JsonValue; use std::collections::{BTreeMap, VecDeque}; use std::fs::FileType; use std::io::SeekFrom; @@ -413,26 +414,36 @@ impl LruCache { } } +#[derive(Debug)] +#[allow(unused)] +pub struct DiscoveredDatafile { + path: PathBuf, + channel_name: String, + variant_name: String, + scalar_type: ScalarType, + shape: Shape, +} + pub async fn scan_files_inner( pairs: BTreeMap, data_base_paths: Vec, -) -> Result>, Error> { +) -> Result>, Error> { let _ = pairs; let (tx, rx) = bounded(16); let tx = Arc::new(tx); let tx2 = tx.clone(); let block1 = async move { let mut lru = LruCache::new(); - // TODO insert channels as a consumer of this stream: - //let dbc = dbconn::create_connection(&node_config.node_config.cluster.database).await?; - //let ndi = dbconn::scan::get_node_disk_ident(&node_config, &dbc).await?; struct PE { path: PathBuf, fty: FileType, } let proot = data_base_paths.last().unwrap().clone(); let proots = proot.to_str().unwrap().to_string(); - let meta = tokio::fs::metadata(&proot).await?; + // TODO factor out to automatically add path to error. + let meta = tokio::fs::metadata(&proot) + .await + .map_err(|e| Error::with_msg(format!("can not open {proot:?} {e:?}")))?; let mut paths = VecDeque::new(); paths.push_back(PE { path: proot, @@ -441,7 +452,10 @@ pub async fn scan_files_inner( loop { if let Some(pe) = paths.pop_back() { if pe.fty.is_dir() { - let mut rd = tokio::fs::read_dir(&pe.path).await?; + // TODO factor out to automatically add path to error. + let mut rd = tokio::fs::read_dir(&pe.path) + .await + .map_err(|e| Error::with_msg(format!("can not open {:?} {e:?}", pe.path)))?; loop { match rd.next_entry().await { Ok(item) => match item { @@ -467,39 +481,31 @@ pub async fn scan_files_inner( //tx.send(Ok(Box::new(serde_json::to_value(fns)?) as ItemSerBox)).await?; let channel_path = &fns[proots.len() + 1..fns.len() - 11]; if !lru.query(channel_path) { - let mut pbr = PbFileReader::new(tokio::fs::File::open(&pe.path).await?).await?; + let f3 = tokio::fs::File::open(&pe.path) + .await + .map_err(|e| Error::with_msg(format!("can not open {:?} {e:?}", pe.path)))?; + let mut pbr = PbFileReader::new(f3).await?; let normalized_channel_name = { let pvn = pbr.channel_name().replace("-", "/"); pvn.replace(":", "/") }; if channel_path != normalized_channel_name { - { - let s = format!("{} - {}", channel_path, normalized_channel_name); - tx.send(Ok(Box::new(serde_json::to_value(&s)?) as ItemSerBox)) - .await - .errstr()?; - } - tx.send(Ok( - Box::new(JsonValue::String(format!("MISMATCH --------------------"))) as ItemSerBox, - )) - .await - .errstr()?; + let msg = format!("{} - {}", channel_path, normalized_channel_name); + warn!("{}", msg); + tx.send(Err(Error::with_msg(format!("MISMATCH --------------------")))) + .await + .errstr()?; } else { - // TODO as a consumer of this stream: - //dbconn::insert_channel(channel_path.into(), ndi.facility, &dbc).await?; - if let Ok(Some(msg)) = pbr.read_msg().await { - let msg = msg.item; lru.insert(channel_path); - { - tx.send(Ok(Box::new(serde_json::to_value(format!( - "channel {} type {}", - pbr.channel_name(), - msg.variant_name() - ))?) as ItemSerBox)) - .await - .errstr()?; - } + let item = DiscoveredDatafile { + path: pe.path, + channel_name: pbr.channel_name().into(), + variant_name: msg.item.variant_name(), + scalar_type: msg.item.type_info().0, + shape: msg.item.type_info().1, + }; + tx.send(Ok(item)).await.errstr()?; } } } @@ -526,6 +532,88 @@ pub async fn scan_files_inner( Ok(rx) } +pub async fn scan_files_msgs( + pairs: BTreeMap, + data_base_paths: Vec, +) -> Result>, Error> { + let (tx, rx) = bounded(16); + let tx = Arc::new(tx); + let tx2 = tx.clone(); + let block1 = async move { + let mut inp = scan_files_inner(pairs, data_base_paths).await?; + while let Some(item) = inp.next().await { + let item = item?; + let msg = format!("{item:?}"); + tx.send(Ok(Box::new(serde_json::to_value(msg)?) as ItemSerBox)) + .await + .errstr()?; + } + Ok(()) + }; + let block2 = async move { + match block1.await { + Ok(_) => {} + Err(e) => match tx2.send(Err(e)).await { + Ok(_) => {} + Err(e) => { + error!("can not deliver error through channel: {:?}", e); + } + }, + } + }; + tokio::spawn(block2); + Ok(rx) +} + +pub async fn scan_files_to_database( + pairs: BTreeMap, + data_base_paths: Vec, + database: Database, +) -> Result>, Error> { + let (tx, rx) = bounded(16); + let tx = Arc::new(tx); + let tx2 = tx.clone(); + let block1 = async move { + let dbc = dbconn::create_connection(&database).await?; + let mut inp = scan_files_inner(pairs, data_base_paths).await?; + while let Some(item) = inp.next().await { + let item = item?; + let sql = "select rowid from channels where name = $1"; + let rows = dbc.query(sql, &[&item.channel_name]).await.errstr()?; + if rows.len() == 0 { + let sql = "insert into channels (name, config) values ($1, $2) on conflict do nothing returning rowid"; + let cfg = serde_json::json!({ + "scalarType":item.scalar_type.to_bsread_str(), + "shape":item.shape, + }); + let rows = dbc.query(sql, &[&item.channel_name, &cfg]).await.errstr()?; + if let Some(row) = rows.last() { + let rowid: i64 = row.get(0); + let msg = format!("insert done rowid {rowid} {item:?}"); + let msg = Box::new(serde_json::to_value(msg)?) as ItemSerBox; + tx.send(Ok(msg)).await.errstr()?; + } + } else { + // TODO update channel config if needed + } + } + Ok(()) + }; + let block2 = async move { + match block1.await { + Ok(_) => {} + Err(e) => match tx2.send(Err(e)).await { + Ok(_) => {} + Err(e) => { + error!("can not deliver error through channel: {:?}", e); + } + }, + } + }; + tokio::spawn(block2); + Ok(rx) +} + pub async fn channel_config(q: &ChannelConfigQuery, aa: &ArchiverAppliance) -> Result { let ci = crate::events::channel_info(&q.channel, aa).await?; let ret = ChannelConfigResponse { diff --git a/archapp_wrap/src/lib.rs b/archapp_wrap/src/lib.rs index 036754d..2ad0f55 100644 --- a/archapp_wrap/src/lib.rs +++ b/archapp_wrap/src/lib.rs @@ -15,12 +15,24 @@ pub fn scan_files( pairs: BTreeMap, node_config: NodeConfigCached, ) -> Pin>, Error>> + Send>> { - Box::pin(archapp::parse::scan_files_inner( + Box::pin(archapp::parse::scan_files_msgs( pairs, node_config.node.archiver_appliance.unwrap().data_base_paths, )) } +pub fn scan_files_insert( + pairs: BTreeMap, + node_config: NodeConfigCached, +) -> Pin>, Error>> + Send>> { + let aa = node_config.node.archiver_appliance.as_ref().unwrap(); + Box::pin(archapp::parse::scan_files_to_database( + pairs, + aa.data_base_paths.clone(), + aa.database.clone(), + )) +} + pub async fn make_event_pipe( evq: &RawEventsQuery, aa: &ArchiverAppliance, diff --git a/daqbuffer/src/bin/daqbuffer.rs b/daqbuffer/src/bin/daqbuffer.rs index 3fd0be7..8f85ab3 100644 --- a/daqbuffer/src/bin/daqbuffer.rs +++ b/daqbuffer/src/bin/daqbuffer.rs @@ -50,8 +50,6 @@ async fn go() -> Result<(), Error> { let opts = Opts::parse(); match opts.subcmd { SubCmd::Retrieval(subcmd) => { - trace!("test trace"); - error!("test error"); info!("daqbuffer {}", clap::crate_version!()); let mut config_file = File::open(subcmd.config).await?; let mut buf = vec![]; diff --git a/daqbufp2/src/test/archapp.rs b/daqbufp2/src/test/archapp.rs index 60cacf9..378505c 100644 --- a/daqbufp2/src/test/archapp.rs +++ b/daqbufp2/src/test/archapp.rs @@ -2,7 +2,8 @@ use super::binnedjson::ScalarEventsResponse; use super::events::get_plain_events_json; use crate::nodes::require_archapp_test_host_running; use err::Error; -use netpod::{f64_close, log::*}; +use netpod::f64_close; +use netpod::log::*; #[test] fn get_events_1() -> Result<(), Error> { diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index ae0cf9a..1157a12 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -270,12 +270,18 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } - } else if path == "/api/4/archapp/files/scan" { + } else if path == "/api/4/archapp/files/scan/msgs" { if req.method() == Method::GET { Ok(archapp_scan_files(req, &node_config).await?) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } + } else if path == "/api/4/archapp/files/scan/insert" { + if req.method() == Method::GET { + Ok(archapp_scan_files_insert(req, &node_config).await?) + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } } else if path == "/api/4/archapp/channel/info" { if req.method() == Method::GET { Ok(archapp_channel_info(req, &node_config).await?) @@ -829,6 +835,34 @@ pub async fn archapp_scan_files(req: Request, node_config: &NodeConfigCach Ok(ret) } +pub async fn archapp_scan_files_insert( + req: Request, + node_config: &NodeConfigCached, +) -> Result, Error> { + let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let pairs = get_url_query_pairs(&url); + let res = archapp_wrap::scan_files_insert(pairs, node_config.clone()).await?; + let ret = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, APP_JSON_LINES) + .body(Body::wrap_stream(res.map(|k| match k { + Ok(k) => match k.serialize() { + Ok(mut item) => { + item.push(0xa); + Ok(item) + } + Err(e) => Err(e), + }, + Err(e) => match serde_json::to_vec(&e) { + Ok(mut item) => { + item.push(0xa); + Ok(item) + } + Err(e) => Err(e.into()), + }, + })))?; + Ok(ret) +} + pub async fn archapp_channel_info(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let url = Url::parse(&format!("dummy:{}", req.uri()))?; let pairs = get_url_query_pairs(&url); diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index b2cfaea..6b08eb1 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -85,6 +85,24 @@ impl ScalarType { Ok(g) } + pub fn to_bsread_str(&self) -> &'static str { + use ScalarType::*; + match self { + U8 => "uint8", + U16 => "uint16", + U32 => "uint32", + U64 => "uint64", + I8 => "int8", + I16 => "int16", + I32 => "int32", + I64 => "int64", + F32 => "float", + F64 => "double", + BOOL => "bool", + STRING => "string", + } + } + pub fn from_bsread_str(s: &str) -> Result { use ScalarType::*; let ret = match s { @@ -98,6 +116,7 @@ impl ScalarType { "int64" => I64, "float" => F32, "double" => F64, + "string" => STRING, _ => { return Err(Error::with_msg_no_trace(format!( "from_bsread_str can not understand bsread {}",