Move backend config key
This commit is contained in:
@@ -47,13 +47,13 @@ fn _get_hostname() -> Result<String, Error> {
|
||||
pub async fn get_node_disk_ident(node_config: &NodeConfigCached, dbc: &Client) -> Result<NodeDiskIdent, Error> {
|
||||
let sql = "select nodes.rowid, facility, split, hostname from nodes, facilities where facilities.name = $1 and facility = facilities.rowid and hostname = $2";
|
||||
let rows = dbc
|
||||
.query(sql, &[&node_config.node.backend, &node_config.node.host])
|
||||
.query(sql, &[&node_config.node_config.cluster.backend, &node_config.node.host])
|
||||
.await
|
||||
.errconv()?;
|
||||
if rows.len() != 1 {
|
||||
return Err(Error::with_msg(format!(
|
||||
"get_node can't find unique entry for {} {}",
|
||||
node_config.node.host, node_config.node.backend
|
||||
node_config.node.host, node_config.node_config.cluster.backend
|
||||
)));
|
||||
}
|
||||
let row = &rows[0];
|
||||
@@ -71,13 +71,13 @@ pub async fn get_node_disk_ident_2(
|
||||
) -> Result<NodeDiskIdent, Error> {
|
||||
let sql = "select nodes.rowid, facility, split, hostname from nodes, facilities where facilities.name = $1 and facility = facilities.rowid and hostname = $2";
|
||||
let rows = dbc
|
||||
.query(sql, &[&node_config.node.backend, &node_config.node.host])
|
||||
.query(sql, &[&node_config.node_config.cluster.backend, &node_config.node.host])
|
||||
.await
|
||||
.errconv()?;
|
||||
if rows.len() != 1 {
|
||||
return Err(Error::with_msg(format!(
|
||||
"get_node can't find unique entry for {} {}",
|
||||
node_config.node.host, node_config.node.backend
|
||||
node_config.node.host, node_config.node_config.cluster.backend
|
||||
)));
|
||||
}
|
||||
let row = &rows[0];
|
||||
|
||||
@@ -193,7 +193,7 @@ pub async fn search_channel(
|
||||
) -> Result<ChannelSearchResult, Error> {
|
||||
let database = &node_config.node_config.cluster.database;
|
||||
if let Some(conf) = node_config.node.channel_archiver.as_ref() {
|
||||
search_channel_archeng(query, node_config.node.backend.clone(), conf, database).await
|
||||
search_channel_archeng(query, node_config.node_config.cluster.backend.clone(), conf, database).await
|
||||
} else if let Some(_conf) = node_config.node.archiver_appliance.as_ref() {
|
||||
// TODO
|
||||
err::todoval()
|
||||
|
||||
@@ -14,7 +14,6 @@ pub fn make_test_node(id: u32) -> Node {
|
||||
port_raw: 8800 + id as u16 + 100,
|
||||
// TODO use a common function to supply the tmp path.
|
||||
cache_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)),
|
||||
backend: "testbackend".into(),
|
||||
sf_databuffer: Some(SfDatabuffer {
|
||||
data_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)),
|
||||
ksprefix: "ks".into(),
|
||||
|
||||
@@ -169,10 +169,10 @@ pub async fn pre_binned_bytes_for_http(
|
||||
node_config: &NodeConfigCached,
|
||||
query: &PreBinnedQuery,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>, Error> {
|
||||
if query.channel().backend != node_config.node.backend {
|
||||
if query.channel().backend != node_config.node_config.cluster.backend {
|
||||
let err = Error::with_msg(format!(
|
||||
"backend mismatch node: {} requested: {}",
|
||||
node_config.node.backend,
|
||||
node_config.node_config.cluster.backend,
|
||||
query.channel().backend
|
||||
));
|
||||
return Err(err);
|
||||
|
||||
@@ -124,7 +124,6 @@ pub async fn gen_test_data() -> Result<(), Error> {
|
||||
port: 7780 + i1 as u16,
|
||||
port_raw: 7780 + i1 as u16 + 100,
|
||||
cache_base_path: data_base_path.join(format!("node{:02}", i1)),
|
||||
backend: "testbackend".into(),
|
||||
sf_databuffer: Some(SfDatabuffer {
|
||||
data_base_path: data_base_path.join(format!("node{:02}", i1)),
|
||||
ksprefix: ksprefix.clone(),
|
||||
|
||||
@@ -333,7 +333,7 @@ impl BlockStream {
|
||||
let read_queue = pairs.get("readQueue").unwrap_or(&"1".to_string()).parse()?;
|
||||
let channel_name = pairs.get("channelName").map(String::from).unwrap_or("NONE".into());
|
||||
let channel = Channel {
|
||||
backend: node_config.node.backend.clone(),
|
||||
backend: node_config.node_config.cluster.backend.clone(),
|
||||
name: channel_name,
|
||||
};
|
||||
use archapp_wrap::archapp::archeng;
|
||||
|
||||
@@ -103,23 +103,12 @@ pub async fn node_status(req: Request<Body>, proxy_config: &ProxyConfig) -> Resu
|
||||
let v = head.headers.get(header::ACCEPT).unwrap_or(&vdef);
|
||||
if v == APP_JSON || v == ACCEPT_ALL {
|
||||
let inpurl = Url::parse(&format!("dummy:{}", head.uri))?;
|
||||
let query = ChannelSearchQuery::from_url(&inpurl)?;
|
||||
let mut bodies = vec![];
|
||||
let urls = proxy_config
|
||||
.backends_status
|
||||
.iter()
|
||||
.filter(|k| {
|
||||
if let Some(back) = &query.backend {
|
||||
back == &k.name
|
||||
} else {
|
||||
true
|
||||
}
|
||||
})
|
||||
.map(|pb| match Url::parse(&format!("{}/api/4/node_status", pb.url)) {
|
||||
Ok(mut url) => {
|
||||
query.append_to_url(&mut url);
|
||||
Ok(url)
|
||||
}
|
||||
Ok(url) => Ok(url),
|
||||
Err(_) => Err(Error::with_msg(format!("parse error for: {:?}", pb))),
|
||||
})
|
||||
.fold_ok(vec![], |mut a, x| {
|
||||
|
||||
@@ -16,7 +16,6 @@ fn ca_connect_1() {
|
||||
host: "".into(),
|
||||
port: 123,
|
||||
port_raw: 123,
|
||||
backend: "".into(),
|
||||
cache_base_path: "".into(),
|
||||
listen: "".into(),
|
||||
sf_databuffer: Some(SfDatabuffer {
|
||||
@@ -30,6 +29,7 @@ fn ca_connect_1() {
|
||||
node_config: NodeConfig {
|
||||
name: "".into(),
|
||||
cluster: Cluster {
|
||||
backend: "".into(),
|
||||
nodes: vec![],
|
||||
database: Database {
|
||||
host: "".into(),
|
||||
|
||||
@@ -224,7 +224,6 @@ pub struct Node {
|
||||
pub port: u16,
|
||||
pub port_raw: u16,
|
||||
pub cache_base_path: PathBuf,
|
||||
pub backend: String,
|
||||
pub sf_databuffer: Option<SfDatabuffer>,
|
||||
pub archiver_appliance: Option<ArchiverAppliance>,
|
||||
pub channel_archiver: Option<ChannelArchiver>,
|
||||
@@ -239,7 +238,6 @@ impl Node {
|
||||
port: 4444,
|
||||
port_raw: 4444,
|
||||
cache_base_path: PathBuf::new(),
|
||||
backend: "dummybackend".into(),
|
||||
sf_databuffer: Some(SfDatabuffer {
|
||||
data_base_path: PathBuf::new(),
|
||||
ksprefix: "daqlocal".into(),
|
||||
@@ -261,6 +259,7 @@ pub struct Database {
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct Cluster {
|
||||
pub backend: String,
|
||||
pub nodes: Vec<Node>,
|
||||
pub database: Database,
|
||||
#[serde(rename = "runMapPulse", default)]
|
||||
@@ -1513,7 +1512,6 @@ pub fn test_cluster() -> Cluster {
|
||||
port: 6170 + id as u16,
|
||||
port_raw: 6170 + id as u16 + 100,
|
||||
cache_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)),
|
||||
backend: "testbackend".into(),
|
||||
sf_databuffer: Some(SfDatabuffer {
|
||||
data_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)),
|
||||
ksprefix: "ks".into(),
|
||||
@@ -1524,6 +1522,7 @@ pub fn test_cluster() -> Cluster {
|
||||
})
|
||||
.collect();
|
||||
Cluster {
|
||||
backend: "testbackend".into(),
|
||||
nodes,
|
||||
database: Database {
|
||||
host: "127.0.0.1".into(),
|
||||
@@ -1546,7 +1545,6 @@ pub fn sls_test_cluster() -> Cluster {
|
||||
port: 6190 + id as u16,
|
||||
port_raw: 6190 + id as u16 + 100,
|
||||
cache_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)),
|
||||
backend: "sls-archive".into(),
|
||||
sf_databuffer: None,
|
||||
archiver_appliance: None,
|
||||
channel_archiver: Some(ChannelArchiver {
|
||||
@@ -1555,6 +1553,7 @@ pub fn sls_test_cluster() -> Cluster {
|
||||
})
|
||||
.collect();
|
||||
Cluster {
|
||||
backend: "sls-archive".into(),
|
||||
nodes,
|
||||
database: Database {
|
||||
host: "127.0.0.1".into(),
|
||||
@@ -1577,7 +1576,6 @@ pub fn archapp_test_cluster() -> Cluster {
|
||||
port: 6200 + id as u16,
|
||||
port_raw: 6200 + id as u16 + 100,
|
||||
cache_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)),
|
||||
backend: "sf-archive".into(),
|
||||
sf_databuffer: None,
|
||||
channel_archiver: None,
|
||||
archiver_appliance: Some(ArchiverAppliance {
|
||||
@@ -1586,6 +1584,7 @@ pub fn archapp_test_cluster() -> Cluster {
|
||||
})
|
||||
.collect();
|
||||
Cluster {
|
||||
backend: "sf-archive".into(),
|
||||
nodes,
|
||||
database: Database {
|
||||
host: "127.0.0.1".into(),
|
||||
|
||||
Reference in New Issue
Block a user