Iterate through pb files and parse the header
This commit is contained in:
@@ -15,6 +15,7 @@ serde_derive = "1.0"
|
|||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
protobuf = "2.24.1"
|
protobuf = "2.24.1"
|
||||||
|
async-channel = "1.6"
|
||||||
err = { path = "../err" }
|
err = { path = "../err" }
|
||||||
taskrun = { path = "../taskrun" }
|
taskrun = { path = "../taskrun" }
|
||||||
netpod = { path = "../netpod" }
|
netpod = { path = "../netpod" }
|
||||||
|
|||||||
+191
-1
@@ -1,4 +1,194 @@
|
|||||||
pub mod generated;
|
use async_channel::{bounded, Receiver};
|
||||||
|
use err::Error;
|
||||||
|
use netpod::log::*;
|
||||||
|
use netpod::NodeConfigCached;
|
||||||
|
use protobuf::Message;
|
||||||
|
use serde::Serialize;
|
||||||
|
use serde_json::Value as JsonValue;
|
||||||
|
use std::collections::{BTreeMap, VecDeque};
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
|
pub mod generated;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test;
|
mod test;
|
||||||
|
|
||||||
|
pub trait ItemSer {
|
||||||
|
fn serialize(&self) -> Result<Vec<u8>, Error>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> ItemSer for T
|
||||||
|
where
|
||||||
|
T: Serialize,
|
||||||
|
{
|
||||||
|
fn serialize(&self) -> Result<Vec<u8>, Error> {
|
||||||
|
let u = serde_json::to_vec(self)?;
|
||||||
|
Ok(u)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn unescape_archapp_msg(inp: &[u8]) -> Result<Vec<u8>, Error> {
|
||||||
|
let mut ret = Vec::with_capacity(inp.len() * 5 / 4);
|
||||||
|
let mut esc = false;
|
||||||
|
for &k in inp.iter() {
|
||||||
|
if k == 0x1b {
|
||||||
|
esc = true;
|
||||||
|
} else if esc {
|
||||||
|
if k == 0x1 {
|
||||||
|
ret.push(0x1b);
|
||||||
|
} else if k == 0x2 {
|
||||||
|
ret.push(0xa);
|
||||||
|
} else if k == 0x3 {
|
||||||
|
ret.push(0xd);
|
||||||
|
} else {
|
||||||
|
return Err(Error::with_msg("malformed escaped archapp message"));
|
||||||
|
}
|
||||||
|
esc = false;
|
||||||
|
} else {
|
||||||
|
ret.push(k);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(ret)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
pub struct EpicsEventPayloadInfo {
|
||||||
|
headers: Vec<(String, String)>,
|
||||||
|
year: i32,
|
||||||
|
pvname: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read_pb_file(path: PathBuf) -> Result<EpicsEventPayloadInfo, Error> {
|
||||||
|
let mut f1 = tokio::fs::File::open(path).await?;
|
||||||
|
let mut buf = vec![0; 1024 * 4];
|
||||||
|
{
|
||||||
|
let mut i1 = 0;
|
||||||
|
loop {
|
||||||
|
let n = f1.read(&mut buf[i1..]).await?;
|
||||||
|
if n == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
i1 += n;
|
||||||
|
if i1 >= buf.len() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let mut j1 = 0;
|
||||||
|
loop {
|
||||||
|
let mut i2 = usize::MAX;
|
||||||
|
for (i1, &k) in buf[j1..].iter().enumerate() {
|
||||||
|
if k == 0xa {
|
||||||
|
i2 = j1 + i1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if i2 != usize::MAX {
|
||||||
|
//info!("got NL {} .. {}", j1, i2);
|
||||||
|
let m = unescape_archapp_msg(&buf[j1..i2])?;
|
||||||
|
if j1 == 0 {
|
||||||
|
let payload_info = crate::generated::EPICSEvent::PayloadInfo::parse_from_bytes(&m).unwrap();
|
||||||
|
//info!("got payload_info: {:?}", payload_info);
|
||||||
|
let z = EpicsEventPayloadInfo {
|
||||||
|
headers: payload_info
|
||||||
|
.get_headers()
|
||||||
|
.iter()
|
||||||
|
.map(|j| (j.get_name().to_string(), j.get_val().to_string()))
|
||||||
|
.collect(),
|
||||||
|
year: payload_info.get_year(),
|
||||||
|
pvname: payload_info.get_pvname().into(),
|
||||||
|
};
|
||||||
|
return Ok(z);
|
||||||
|
} else {
|
||||||
|
let scalar_double = crate::generated::EPICSEvent::ScalarDouble::parse_from_bytes(&m).unwrap();
|
||||||
|
//info!("got scalar_double: {:?}", scalar_double);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
//info!("no more packets");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
j1 = i2 + 1;
|
||||||
|
}
|
||||||
|
Err(Error::with_msg(format!("no header entry found in file")))
|
||||||
|
}
|
||||||
|
|
||||||
|
type RT1 = Box<dyn ItemSer + Send>;
|
||||||
|
|
||||||
|
pub async fn scan_files(
|
||||||
|
_pairs: BTreeMap<String, String>,
|
||||||
|
node_config: &NodeConfigCached,
|
||||||
|
) -> Result<Receiver<Result<RT1, Error>>, Error> {
|
||||||
|
let aa = if let Some(aa) = &node_config.node.archiver_appliance {
|
||||||
|
aa.clone()
|
||||||
|
} else {
|
||||||
|
return Err(Error::with_msg("no archiver appliance config"));
|
||||||
|
};
|
||||||
|
let (tx, rx) = bounded(16);
|
||||||
|
let tx = Arc::new(tx);
|
||||||
|
let tx2 = tx.clone();
|
||||||
|
let block1 = async move {
|
||||||
|
let mut paths = VecDeque::new();
|
||||||
|
paths.push_back(aa.data_base_path);
|
||||||
|
loop {
|
||||||
|
if let Some(path) = paths.pop_back() {
|
||||||
|
let meta = tokio::fs::metadata(&path).await?;
|
||||||
|
if meta.is_dir() {
|
||||||
|
let mut rd = tokio::fs::read_dir(&path).await?;
|
||||||
|
loop {
|
||||||
|
match rd.next_entry().await {
|
||||||
|
Ok(item) => match item {
|
||||||
|
Some(item) => {
|
||||||
|
paths.push_back(item.path());
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
tx.send(Err(e.into())).await.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if meta.is_file() {
|
||||||
|
//tx.send(Ok(Box::new(path.clone()) as RT1)).await?;
|
||||||
|
if path.to_str().unwrap().ends_with(".pb") {
|
||||||
|
let packet = read_pb_file(path.clone()).await?;
|
||||||
|
let pvn = packet.pvname.replace("-", "/");
|
||||||
|
let pvn = pvn.replace(":", "/");
|
||||||
|
let pre = "/arch/lts/ArchiverStore/";
|
||||||
|
let p3 = &path.to_str().unwrap()[pre.len()..];
|
||||||
|
let p3 = &p3[..p3.len() - 11];
|
||||||
|
if p3 != pvn {
|
||||||
|
tx.send(Ok(Box::new(serde_json::to_value(&packet)?) as RT1)).await?;
|
||||||
|
{
|
||||||
|
let s = format!("{} - {}", p3, packet.pvname);
|
||||||
|
tx.send(Ok(Box::new(serde_json::to_value(&s)?) as RT1)).await?;
|
||||||
|
}
|
||||||
|
tx.send(Ok(
|
||||||
|
Box::new(JsonValue::String(format!("MISMATCH --------------------"))) as RT1,
|
||||||
|
))
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok::<_, Error>(())
|
||||||
|
};
|
||||||
|
let block2 = async move {
|
||||||
|
match block1.await {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(e) => match tx2.send(Err(e.into())).await {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(e) => {
|
||||||
|
error!("can not deliver error through channel: {:?}", e);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
};
|
||||||
|
tokio::spawn(block2);
|
||||||
|
Ok(rx)
|
||||||
|
}
|
||||||
|
|||||||
+1
-24
@@ -1,31 +1,8 @@
|
|||||||
|
use crate::unescape_archapp_msg;
|
||||||
use err::Error;
|
use err::Error;
|
||||||
use netpod::log::*;
|
use netpod::log::*;
|
||||||
use protobuf::Message;
|
use protobuf::Message;
|
||||||
|
|
||||||
fn unescape_archapp_msg(inp: &[u8]) -> Result<Vec<u8>, Error> {
|
|
||||||
let mut ret = Vec::with_capacity(inp.len() * 5 / 4);
|
|
||||||
let mut esc = false;
|
|
||||||
for &k in inp.iter() {
|
|
||||||
if k == 0x1b {
|
|
||||||
esc = true;
|
|
||||||
} else if esc {
|
|
||||||
if k == 0x1 {
|
|
||||||
ret.push(0x1b);
|
|
||||||
} else if k == 0x2 {
|
|
||||||
ret.push(0xa);
|
|
||||||
} else if k == 0x3 {
|
|
||||||
ret.push(0xd);
|
|
||||||
} else {
|
|
||||||
return Err(Error::with_msg("malformed escaped archapp message"));
|
|
||||||
}
|
|
||||||
esc = false;
|
|
||||||
} else {
|
|
||||||
ret.push(k);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(ret)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn read_pb_00() -> Result<(), Error> {
|
fn read_pb_00() -> Result<(), Error> {
|
||||||
let block1 = async move {
|
let block1 = async move {
|
||||||
|
|||||||
@@ -13,10 +13,10 @@ tracing-subscriber = "0.2.17"
|
|||||||
futures-core = "0.3.14"
|
futures-core = "0.3.14"
|
||||||
futures-util = "0.3.14"
|
futures-util = "0.3.14"
|
||||||
bytes = "1.0.1"
|
bytes = "1.0.1"
|
||||||
bincode = "1.3.3"
|
#bincode = "1.3.3"
|
||||||
#async-channel = "1"
|
#async-channel = "1"
|
||||||
#dashmap = "3"
|
#dashmap = "3"
|
||||||
tokio-postgres = "0.7"
|
#tokio-postgres = "0.7"
|
||||||
serde = "1.0"
|
serde = "1.0"
|
||||||
serde_derive = "1.0"
|
serde_derive = "1.0"
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
@@ -27,5 +27,6 @@ lazy_static = "1.4.0"
|
|||||||
err = { path = "../err" }
|
err = { path = "../err" }
|
||||||
taskrun = { path = "../taskrun" }
|
taskrun = { path = "../taskrun" }
|
||||||
netpod = { path = "../netpod" }
|
netpod = { path = "../netpod" }
|
||||||
httpret = { path = "../httpret" }
|
#httpret = { path = "../httpret" }
|
||||||
disk = { path = "../disk" }
|
disk = { path = "../disk" }
|
||||||
|
daqbufp2 = { path = "../daqbufp2" }
|
||||||
|
|||||||
@@ -59,7 +59,7 @@ async fn go() -> Result<(), Error> {
|
|||||||
let node_config: NodeConfig = serde_json::from_slice(&buf)?;
|
let node_config: NodeConfig = serde_json::from_slice(&buf)?;
|
||||||
let node_config: Result<NodeConfigCached, Error> = node_config.into();
|
let node_config: Result<NodeConfigCached, Error> = node_config.into();
|
||||||
let node_config = node_config?;
|
let node_config = node_config?;
|
||||||
daqbuffer::run_node(node_config.clone()).await?;
|
daqbufp2::run_node(node_config.clone()).await?;
|
||||||
}
|
}
|
||||||
SubCmd::Proxy(subcmd) => {
|
SubCmd::Proxy(subcmd) => {
|
||||||
info!("daqbuffer proxy {}", clap::crate_version!());
|
info!("daqbuffer proxy {}", clap::crate_version!());
|
||||||
@@ -67,17 +67,17 @@ async fn go() -> Result<(), Error> {
|
|||||||
let mut buf = vec![];
|
let mut buf = vec![];
|
||||||
config_file.read_to_end(&mut buf).await?;
|
config_file.read_to_end(&mut buf).await?;
|
||||||
let proxy_config: ProxyConfig = serde_json::from_slice(&buf)?;
|
let proxy_config: ProxyConfig = serde_json::from_slice(&buf)?;
|
||||||
daqbuffer::run_proxy(proxy_config.clone()).await?;
|
daqbufp2::run_proxy(proxy_config.clone()).await?;
|
||||||
}
|
}
|
||||||
SubCmd::Client(client) => match client.client_type {
|
SubCmd::Client(client) => match client.client_type {
|
||||||
ClientType::Status(opts) => {
|
ClientType::Status(opts) => {
|
||||||
daqbuffer::client::status(opts.host, opts.port).await?;
|
daqbufp2::client::status(opts.host, opts.port).await?;
|
||||||
}
|
}
|
||||||
ClientType::Binned(opts) => {
|
ClientType::Binned(opts) => {
|
||||||
let beg = parse_ts(&opts.beg)?;
|
let beg = parse_ts(&opts.beg)?;
|
||||||
let end = parse_ts(&opts.end)?;
|
let end = parse_ts(&opts.end)?;
|
||||||
let cache_usage = CacheUsage::from_string(&opts.cache)?;
|
let cache_usage = CacheUsage::from_string(&opts.cache)?;
|
||||||
daqbuffer::client::get_binned(
|
daqbufp2::client::get_binned(
|
||||||
opts.host,
|
opts.host,
|
||||||
opts.port,
|
opts.port,
|
||||||
opts.backend,
|
opts.backend,
|
||||||
@@ -103,7 +103,7 @@ fn simple_fetch() {
|
|||||||
use netpod::Nanos;
|
use netpod::Nanos;
|
||||||
use netpod::{timeunits::*, ByteOrder, Channel, ChannelConfig, ScalarType, Shape};
|
use netpod::{timeunits::*, ByteOrder, Channel, ChannelConfig, ScalarType, Shape};
|
||||||
taskrun::run(async {
|
taskrun::run(async {
|
||||||
let _rh = daqbuffer::nodes::require_test_hosts_running()?;
|
let _rh = daqbufp2::nodes::require_test_hosts_running()?;
|
||||||
let t1 = chrono::Utc::now();
|
let t1 = chrono::Utc::now();
|
||||||
let query = netpod::AggQuerySingleChannel {
|
let query = netpod::AggQuerySingleChannel {
|
||||||
channel_config: ChannelConfig {
|
channel_config: ChannelConfig {
|
||||||
|
|||||||
@@ -1,36 +1 @@
|
|||||||
use err::Error;
|
|
||||||
use netpod::{Cluster, NodeConfig, NodeConfigCached, ProxyConfig};
|
|
||||||
use tokio::task::JoinHandle;
|
|
||||||
#[allow(unused_imports)]
|
|
||||||
use tracing::{debug, error, info, trace, warn};
|
|
||||||
|
|
||||||
pub mod cli;
|
pub mod cli;
|
||||||
pub mod client;
|
|
||||||
pub mod nodes;
|
|
||||||
#[cfg(test)]
|
|
||||||
pub mod test;
|
|
||||||
|
|
||||||
pub fn spawn_test_hosts(cluster: Cluster) -> Vec<JoinHandle<Result<(), Error>>> {
|
|
||||||
let mut ret = vec![];
|
|
||||||
for node in &cluster.nodes {
|
|
||||||
let node_config = NodeConfig {
|
|
||||||
cluster: cluster.clone(),
|
|
||||||
name: format!("{}:{}", node.host, node.port),
|
|
||||||
};
|
|
||||||
let node_config: Result<NodeConfigCached, Error> = node_config.into();
|
|
||||||
let node_config = node_config.unwrap();
|
|
||||||
let h = tokio::spawn(httpret::host(node_config));
|
|
||||||
ret.push(h);
|
|
||||||
}
|
|
||||||
ret
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn run_node(node_config: NodeConfigCached) -> Result<(), Error> {
|
|
||||||
httpret::host(node_config).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn run_proxy(proxy_config: ProxyConfig) -> Result<(), Error> {
|
|
||||||
httpret::proxy::proxy(proxy_config).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -0,0 +1,29 @@
|
|||||||
|
[package]
|
||||||
|
name = "daqbufp2"
|
||||||
|
version = "0.0.1-a.dev.12"
|
||||||
|
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||||
|
edition = "2018"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
tokio = { version = "1.4.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
|
||||||
|
hyper = "0.14"
|
||||||
|
http = "0.2"
|
||||||
|
tracing = "0.1.25"
|
||||||
|
tracing-subscriber = "0.2.17"
|
||||||
|
futures-core = "0.3.14"
|
||||||
|
futures-util = "0.3.14"
|
||||||
|
bytes = "1.0.1"
|
||||||
|
bincode = "1.3.3"
|
||||||
|
#async-channel = "1"
|
||||||
|
#dashmap = "3"
|
||||||
|
serde = "1.0"
|
||||||
|
serde_derive = "1.0"
|
||||||
|
serde_json = "1.0"
|
||||||
|
chrono = "0.4"
|
||||||
|
url = "2.2.2"
|
||||||
|
lazy_static = "1.4.0"
|
||||||
|
err = { path = "../err" }
|
||||||
|
taskrun = { path = "../taskrun" }
|
||||||
|
netpod = { path = "../netpod" }
|
||||||
|
httpret = { path = "../httpret" }
|
||||||
|
disk = { path = "../disk" }
|
||||||
@@ -0,0 +1,34 @@
|
|||||||
|
use tokio::task::JoinHandle;
|
||||||
|
|
||||||
|
use err::Error;
|
||||||
|
use netpod::{Cluster, NodeConfig, NodeConfigCached, ProxyConfig};
|
||||||
|
|
||||||
|
pub mod client;
|
||||||
|
pub mod nodes;
|
||||||
|
#[cfg(test)]
|
||||||
|
pub mod test;
|
||||||
|
|
||||||
|
pub fn spawn_test_hosts(cluster: Cluster) -> Vec<JoinHandle<Result<(), Error>>> {
|
||||||
|
let mut ret = vec![];
|
||||||
|
for node in &cluster.nodes {
|
||||||
|
let node_config = NodeConfig {
|
||||||
|
cluster: cluster.clone(),
|
||||||
|
name: format!("{}:{}", node.host, node.port),
|
||||||
|
};
|
||||||
|
let node_config: Result<NodeConfigCached, Error> = node_config.into();
|
||||||
|
let node_config = node_config.unwrap();
|
||||||
|
let h = tokio::spawn(httpret::host(node_config));
|
||||||
|
ret.push(h);
|
||||||
|
}
|
||||||
|
ret
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run_node(node_config: NodeConfigCached) -> Result<(), Error> {
|
||||||
|
httpret::host(node_config).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run_proxy(proxy_config: ProxyConfig) -> Result<(), Error> {
|
||||||
|
httpret::proxy::proxy(proxy_config).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
@@ -44,6 +44,7 @@ fn test_cluster() -> Cluster {
|
|||||||
split: id,
|
split: id,
|
||||||
backend: "testbackend".into(),
|
backend: "testbackend".into(),
|
||||||
bin_grain_kind: 0,
|
bin_grain_kind: 0,
|
||||||
|
archiver_appliance: None,
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
Cluster {
|
Cluster {
|
||||||
@@ -16,6 +16,7 @@ pub fn make_test_node(id: u32) -> Node {
|
|||||||
ksprefix: "ks".into(),
|
ksprefix: "ks".into(),
|
||||||
backend: "testbackend".into(),
|
backend: "testbackend".into(),
|
||||||
bin_grain_kind: 0,
|
bin_grain_kind: 0,
|
||||||
|
archiver_appliance: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -81,6 +81,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
|
|||||||
ksprefix: ksprefix.clone(),
|
ksprefix: ksprefix.clone(),
|
||||||
backend: "testbackend".into(),
|
backend: "testbackend".into(),
|
||||||
bin_grain_kind: 0,
|
bin_grain_kind: 0,
|
||||||
|
archiver_appliance: None,
|
||||||
};
|
};
|
||||||
ensemble.nodes.push(node);
|
ensemble.nodes.push(node);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,4 +24,5 @@ dbconn = { path = "../dbconn" }
|
|||||||
disk = { path = "../disk" }
|
disk = { path = "../disk" }
|
||||||
parse = { path = "../parse" }
|
parse = { path = "../parse" }
|
||||||
netfetch = { path = "../netfetch" }
|
netfetch = { path = "../netfetch" }
|
||||||
|
archapp = { path = "../archapp" }
|
||||||
taskrun = { path = "../taskrun" }
|
taskrun = { path = "../taskrun" }
|
||||||
|
|||||||
@@ -217,6 +217,12 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
|
|||||||
} else {
|
} else {
|
||||||
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
|
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
|
||||||
}
|
}
|
||||||
|
} else if path == "/api/4/archapp/files" {
|
||||||
|
if req.method() == Method::GET {
|
||||||
|
Ok(archapp_scan_files(req, &node_config).await?)
|
||||||
|
} else {
|
||||||
|
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
|
||||||
|
}
|
||||||
} else if path == "/api/4/channel/config" {
|
} else if path == "/api/4/channel/config" {
|
||||||
if req.method() == Method::GET {
|
if req.method() == Method::GET {
|
||||||
Ok(channel_config(req, &node_config).await?)
|
Ok(channel_config(req, &node_config).await?)
|
||||||
@@ -624,3 +630,28 @@ pub async fn ca_connect_1(req: Request<Body>, node_config: &NodeConfigCached) ->
|
|||||||
})))?;
|
})))?;
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn archapp_scan_files(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||||
|
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
|
||||||
|
let pairs = get_url_query_pairs(&url);
|
||||||
|
let res = archapp::scan_files(pairs, node_config).await?;
|
||||||
|
let ret = response(StatusCode::OK)
|
||||||
|
.header(http::header::CONTENT_TYPE, APP_JSON_LINES)
|
||||||
|
.body(Body::wrap_stream(res.map(|k| match k {
|
||||||
|
Ok(k) => match k.serialize() {
|
||||||
|
Ok(mut item) => {
|
||||||
|
item.push(0xa);
|
||||||
|
Ok(item)
|
||||||
|
}
|
||||||
|
Err(e) => Err(e),
|
||||||
|
},
|
||||||
|
Err(e) => match serde_json::to_vec(&e) {
|
||||||
|
Ok(mut item) => {
|
||||||
|
item.push(0xa);
|
||||||
|
Ok(item)
|
||||||
|
}
|
||||||
|
Err(e) => Err(e.into()),
|
||||||
|
},
|
||||||
|
})))?;
|
||||||
|
Ok(ret)
|
||||||
|
}
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ fn ca_connect_1() {
|
|||||||
data_base_path: "".into(),
|
data_base_path: "".into(),
|
||||||
listen: "".into(),
|
listen: "".into(),
|
||||||
ksprefix: "".into(),
|
ksprefix: "".into(),
|
||||||
|
archiver_appliance: None,
|
||||||
},
|
},
|
||||||
node_config: NodeConfig {
|
node_config: NodeConfig {
|
||||||
name: "".into(),
|
name: "".into(),
|
||||||
|
|||||||
@@ -165,6 +165,11 @@ impl ScalarType {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct ArchiverAppliance {
|
||||||
|
pub data_base_path: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct Node {
|
pub struct Node {
|
||||||
pub host: String,
|
pub host: String,
|
||||||
@@ -177,6 +182,7 @@ pub struct Node {
|
|||||||
pub backend: String,
|
pub backend: String,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub bin_grain_kind: u32,
|
pub bin_grain_kind: u32,
|
||||||
|
pub archiver_appliance: Option<ArchiverAppliance>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
|||||||
Reference in New Issue
Block a user