Scan files and insert channel name into database

This commit is contained in:
Dominik Werder
2022-02-18 09:33:38 +01:00
parent c6dba54a62
commit b82a6292d2
6 changed files with 189 additions and 37 deletions

View File

@@ -7,15 +7,16 @@ use archapp_xc::*;
use async_channel::{bounded, Receiver};
use chrono::{TimeZone, Utc};
use err::{ErrStr, Error};
use futures_util::StreamExt;
use items::eventsitem::EventsItem;
use items::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents};
use items::scalarevents::ScalarEvents;
use items::waveevents::WaveEvents;
use netpod::log::*;
use netpod::{ArchiverAppliance, ChannelConfigQuery, ChannelConfigResponse};
use netpod::{Database, ScalarType, Shape};
use protobuf::Message;
use serde::Serialize;
use serde_json::Value as JsonValue;
use std::collections::{BTreeMap, VecDeque};
use std::fs::FileType;
use std::io::SeekFrom;
@@ -413,26 +414,36 @@ impl LruCache {
}
}
#[derive(Debug)]
#[allow(unused)]
pub struct DiscoveredDatafile {
path: PathBuf,
channel_name: String,
variant_name: String,
scalar_type: ScalarType,
shape: Shape,
}
pub async fn scan_files_inner(
pairs: BTreeMap<String, String>,
data_base_paths: Vec<PathBuf>,
) -> Result<Receiver<Result<ItemSerBox, Error>>, Error> {
) -> Result<Receiver<Result<DiscoveredDatafile, Error>>, Error> {
let _ = pairs;
let (tx, rx) = bounded(16);
let tx = Arc::new(tx);
let tx2 = tx.clone();
let block1 = async move {
let mut lru = LruCache::new();
// TODO insert channels as a consumer of this stream:
//let dbc = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
//let ndi = dbconn::scan::get_node_disk_ident(&node_config, &dbc).await?;
struct PE {
path: PathBuf,
fty: FileType,
}
let proot = data_base_paths.last().unwrap().clone();
let proots = proot.to_str().unwrap().to_string();
let meta = tokio::fs::metadata(&proot).await?;
// TODO factor out to automatically add path to error.
let meta = tokio::fs::metadata(&proot)
.await
.map_err(|e| Error::with_msg(format!("can not open {proot:?} {e:?}")))?;
let mut paths = VecDeque::new();
paths.push_back(PE {
path: proot,
@@ -441,7 +452,10 @@ pub async fn scan_files_inner(
loop {
if let Some(pe) = paths.pop_back() {
if pe.fty.is_dir() {
let mut rd = tokio::fs::read_dir(&pe.path).await?;
// TODO factor out to automatically add path to error.
let mut rd = tokio::fs::read_dir(&pe.path)
.await
.map_err(|e| Error::with_msg(format!("can not open {:?} {e:?}", pe.path)))?;
loop {
match rd.next_entry().await {
Ok(item) => match item {
@@ -467,39 +481,31 @@ pub async fn scan_files_inner(
//tx.send(Ok(Box::new(serde_json::to_value(fns)?) as ItemSerBox)).await?;
let channel_path = &fns[proots.len() + 1..fns.len() - 11];
if !lru.query(channel_path) {
let mut pbr = PbFileReader::new(tokio::fs::File::open(&pe.path).await?).await?;
let f3 = tokio::fs::File::open(&pe.path)
.await
.map_err(|e| Error::with_msg(format!("can not open {:?} {e:?}", pe.path)))?;
let mut pbr = PbFileReader::new(f3).await?;
let normalized_channel_name = {
let pvn = pbr.channel_name().replace("-", "/");
pvn.replace(":", "/")
};
if channel_path != normalized_channel_name {
{
let s = format!("{} - {}", channel_path, normalized_channel_name);
tx.send(Ok(Box::new(serde_json::to_value(&s)?) as ItemSerBox))
.await
.errstr()?;
}
tx.send(Ok(
Box::new(JsonValue::String(format!("MISMATCH --------------------"))) as ItemSerBox,
))
.await
.errstr()?;
let msg = format!("{} - {}", channel_path, normalized_channel_name);
warn!("{}", msg);
tx.send(Err(Error::with_msg(format!("MISMATCH --------------------"))))
.await
.errstr()?;
} else {
// TODO as a consumer of this stream:
//dbconn::insert_channel(channel_path.into(), ndi.facility, &dbc).await?;
if let Ok(Some(msg)) = pbr.read_msg().await {
let msg = msg.item;
lru.insert(channel_path);
{
tx.send(Ok(Box::new(serde_json::to_value(format!(
"channel {} type {}",
pbr.channel_name(),
msg.variant_name()
))?) as ItemSerBox))
.await
.errstr()?;
}
let item = DiscoveredDatafile {
path: pe.path,
channel_name: pbr.channel_name().into(),
variant_name: msg.item.variant_name(),
scalar_type: msg.item.type_info().0,
shape: msg.item.type_info().1,
};
tx.send(Ok(item)).await.errstr()?;
}
}
}
@@ -526,6 +532,88 @@ pub async fn scan_files_inner(
Ok(rx)
}
pub async fn scan_files_msgs(
pairs: BTreeMap<String, String>,
data_base_paths: Vec<PathBuf>,
) -> Result<Receiver<Result<ItemSerBox, Error>>, Error> {
let (tx, rx) = bounded(16);
let tx = Arc::new(tx);
let tx2 = tx.clone();
let block1 = async move {
let mut inp = scan_files_inner(pairs, data_base_paths).await?;
while let Some(item) = inp.next().await {
let item = item?;
let msg = format!("{item:?}");
tx.send(Ok(Box::new(serde_json::to_value(msg)?) as ItemSerBox))
.await
.errstr()?;
}
Ok(())
};
let block2 = async move {
match block1.await {
Ok(_) => {}
Err(e) => match tx2.send(Err(e)).await {
Ok(_) => {}
Err(e) => {
error!("can not deliver error through channel: {:?}", e);
}
},
}
};
tokio::spawn(block2);
Ok(rx)
}
pub async fn scan_files_to_database(
pairs: BTreeMap<String, String>,
data_base_paths: Vec<PathBuf>,
database: Database,
) -> Result<Receiver<Result<ItemSerBox, Error>>, Error> {
let (tx, rx) = bounded(16);
let tx = Arc::new(tx);
let tx2 = tx.clone();
let block1 = async move {
let dbc = dbconn::create_connection(&database).await?;
let mut inp = scan_files_inner(pairs, data_base_paths).await?;
while let Some(item) = inp.next().await {
let item = item?;
let sql = "select rowid from channels where name = $1";
let rows = dbc.query(sql, &[&item.channel_name]).await.errstr()?;
if rows.len() == 0 {
let sql = "insert into channels (name, config) values ($1, $2) on conflict do nothing returning rowid";
let cfg = serde_json::json!({
"scalarType":item.scalar_type.to_bsread_str(),
"shape":item.shape,
});
let rows = dbc.query(sql, &[&item.channel_name, &cfg]).await.errstr()?;
if let Some(row) = rows.last() {
let rowid: i64 = row.get(0);
let msg = format!("insert done rowid {rowid} {item:?}");
let msg = Box::new(serde_json::to_value(msg)?) as ItemSerBox;
tx.send(Ok(msg)).await.errstr()?;
}
} else {
// TODO update channel config if needed
}
}
Ok(())
};
let block2 = async move {
match block1.await {
Ok(_) => {}
Err(e) => match tx2.send(Err(e)).await {
Ok(_) => {}
Err(e) => {
error!("can not deliver error through channel: {:?}", e);
}
},
}
};
tokio::spawn(block2);
Ok(rx)
}
pub async fn channel_config(q: &ChannelConfigQuery, aa: &ArchiverAppliance) -> Result<ChannelConfigResponse, Error> {
let ci = crate::events::channel_info(&q.channel, aa).await?;
let ret = ChannelConfigResponse {

View File

@@ -15,12 +15,24 @@ pub fn scan_files(
pairs: BTreeMap<String, String>,
node_config: NodeConfigCached,
) -> Pin<Box<dyn Future<Output = Result<Receiver<Result<ItemSerBox, Error>>, Error>> + Send>> {
Box::pin(archapp::parse::scan_files_inner(
Box::pin(archapp::parse::scan_files_msgs(
pairs,
node_config.node.archiver_appliance.unwrap().data_base_paths,
))
}
pub fn scan_files_insert(
pairs: BTreeMap<String, String>,
node_config: NodeConfigCached,
) -> Pin<Box<dyn Future<Output = Result<Receiver<Result<ItemSerBox, Error>>, Error>> + Send>> {
let aa = node_config.node.archiver_appliance.as_ref().unwrap();
Box::pin(archapp::parse::scan_files_to_database(
pairs,
aa.data_base_paths.clone(),
aa.database.clone(),
))
}
pub async fn make_event_pipe(
evq: &RawEventsQuery,
aa: &ArchiverAppliance,

View File

@@ -50,8 +50,6 @@ async fn go() -> Result<(), Error> {
let opts = Opts::parse();
match opts.subcmd {
SubCmd::Retrieval(subcmd) => {
trace!("test trace");
error!("test error");
info!("daqbuffer {}", clap::crate_version!());
let mut config_file = File::open(subcmd.config).await?;
let mut buf = vec![];

View File

@@ -2,7 +2,8 @@ use super::binnedjson::ScalarEventsResponse;
use super::events::get_plain_events_json;
use crate::nodes::require_archapp_test_host_running;
use err::Error;
use netpod::{f64_close, log::*};
use netpod::f64_close;
use netpod::log::*;
#[test]
fn get_events_1() -> Result<(), Error> {

View File

@@ -270,12 +270,18 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/archapp/files/scan" {
} else if path == "/api/4/archapp/files/scan/msgs" {
if req.method() == Method::GET {
Ok(archapp_scan_files(req, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/archapp/files/scan/insert" {
if req.method() == Method::GET {
Ok(archapp_scan_files_insert(req, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/archapp/channel/info" {
if req.method() == Method::GET {
Ok(archapp_channel_info(req, &node_config).await?)
@@ -829,6 +835,34 @@ pub async fn archapp_scan_files(req: Request<Body>, node_config: &NodeConfigCach
Ok(ret)
}
pub async fn archapp_scan_files_insert(
req: Request<Body>,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let pairs = get_url_query_pairs(&url);
let res = archapp_wrap::scan_files_insert(pairs, node_config.clone()).await?;
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON_LINES)
.body(Body::wrap_stream(res.map(|k| match k {
Ok(k) => match k.serialize() {
Ok(mut item) => {
item.push(0xa);
Ok(item)
}
Err(e) => Err(e),
},
Err(e) => match serde_json::to_vec(&e) {
Ok(mut item) => {
item.push(0xa);
Ok(item)
}
Err(e) => Err(e.into()),
},
})))?;
Ok(ret)
}
pub async fn archapp_channel_info(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let pairs = get_url_query_pairs(&url);

View File

@@ -85,6 +85,24 @@ impl ScalarType {
Ok(g)
}
pub fn to_bsread_str(&self) -> &'static str {
use ScalarType::*;
match self {
U8 => "uint8",
U16 => "uint16",
U32 => "uint32",
U64 => "uint64",
I8 => "int8",
I16 => "int16",
I32 => "int32",
I64 => "int64",
F32 => "float",
F64 => "double",
BOOL => "bool",
STRING => "string",
}
}
pub fn from_bsread_str(s: &str) -> Result<Self, Error> {
use ScalarType::*;
let ret = match s {
@@ -98,6 +116,7 @@ impl ScalarType {
"int64" => I64,
"float" => F32,
"double" => F64,
"string" => STRING,
_ => {
return Err(Error::with_msg_no_trace(format!(
"from_bsread_str can not understand bsread {}",