diff --git a/dbconn/src/scan.rs b/dbconn/src/scan.rs index b3cbca9..2d36398 100644 --- a/dbconn/src/scan.rs +++ b/dbconn/src/scan.rs @@ -47,13 +47,13 @@ fn _get_hostname() -> Result { pub async fn get_node_disk_ident(node_config: &NodeConfigCached, dbc: &Client) -> Result { let sql = "select nodes.rowid, facility, split, hostname from nodes, facilities where facilities.name = $1 and facility = facilities.rowid and hostname = $2"; let rows = dbc - .query(sql, &[&node_config.node.backend, &node_config.node.host]) + .query(sql, &[&node_config.node_config.cluster.backend, &node_config.node.host]) .await .errconv()?; if rows.len() != 1 { return Err(Error::with_msg(format!( "get_node can't find unique entry for {} {}", - node_config.node.host, node_config.node.backend + node_config.node.host, node_config.node_config.cluster.backend ))); } let row = &rows[0]; @@ -71,13 +71,13 @@ pub async fn get_node_disk_ident_2( ) -> Result { let sql = "select nodes.rowid, facility, split, hostname from nodes, facilities where facilities.name = $1 and facility = facilities.rowid and hostname = $2"; let rows = dbc - .query(sql, &[&node_config.node.backend, &node_config.node.host]) + .query(sql, &[&node_config.node_config.cluster.backend, &node_config.node.host]) .await .errconv()?; if rows.len() != 1 { return Err(Error::with_msg(format!( "get_node can't find unique entry for {} {}", - node_config.node.host, node_config.node.backend + node_config.node.host, node_config.node_config.cluster.backend ))); } let row = &rows[0]; diff --git a/dbconn/src/search.rs b/dbconn/src/search.rs index 719be30..45c1b12 100644 --- a/dbconn/src/search.rs +++ b/dbconn/src/search.rs @@ -193,7 +193,7 @@ pub async fn search_channel( ) -> Result { let database = &node_config.node_config.cluster.database; if let Some(conf) = node_config.node.channel_archiver.as_ref() { - search_channel_archeng(query, node_config.node.backend.clone(), conf, database).await + search_channel_archeng(query, node_config.node_config.cluster.backend.clone(), conf, database).await } else if let Some(_conf) = node_config.node.archiver_appliance.as_ref() { // TODO err::todoval() diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 77650e1..a5a7b1e 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -14,7 +14,6 @@ pub fn make_test_node(id: u32) -> Node { port_raw: 8800 + id as u16 + 100, // TODO use a common function to supply the tmp path. cache_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)), - backend: "testbackend".into(), sf_databuffer: Some(SfDatabuffer { data_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)), ksprefix: "ks".into(), diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index 32adcc2..b15d633 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -169,10 +169,10 @@ pub async fn pre_binned_bytes_for_http( node_config: &NodeConfigCached, query: &PreBinnedQuery, ) -> Result> + Send>>, Error> { - if query.channel().backend != node_config.node.backend { + if query.channel().backend != node_config.node_config.cluster.backend { let err = Error::with_msg(format!( "backend mismatch node: {} requested: {}", - node_config.node.backend, + node_config.node_config.cluster.backend, query.channel().backend )); return Err(err); diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 9f0faf6..6aed8f6 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -124,7 +124,6 @@ pub async fn gen_test_data() -> Result<(), Error> { port: 7780 + i1 as u16, port_raw: 7780 + i1 as u16 + 100, cache_base_path: data_base_path.join(format!("node{:02}", i1)), - backend: "testbackend".into(), sf_databuffer: Some(SfDatabuffer { data_base_path: data_base_path.join(format!("node{:02}", i1)), ksprefix: ksprefix.clone(), diff --git a/httpret/src/channelarchiver.rs b/httpret/src/channelarchiver.rs index 601d700..0ced47b 100644 --- a/httpret/src/channelarchiver.rs +++ b/httpret/src/channelarchiver.rs @@ -333,7 +333,7 @@ impl BlockStream { let read_queue = pairs.get("readQueue").unwrap_or(&"1".to_string()).parse()?; let channel_name = pairs.get("channelName").map(String::from).unwrap_or("NONE".into()); let channel = Channel { - backend: node_config.node.backend.clone(), + backend: node_config.node_config.cluster.backend.clone(), name: channel_name, }; use archapp_wrap::archapp::archeng; diff --git a/httpret/src/proxy/api4.rs b/httpret/src/proxy/api4.rs index fe17e88..71e02b5 100644 --- a/httpret/src/proxy/api4.rs +++ b/httpret/src/proxy/api4.rs @@ -103,23 +103,12 @@ pub async fn node_status(req: Request, proxy_config: &ProxyConfig) -> Resu let v = head.headers.get(header::ACCEPT).unwrap_or(&vdef); if v == APP_JSON || v == ACCEPT_ALL { let inpurl = Url::parse(&format!("dummy:{}", head.uri))?; - let query = ChannelSearchQuery::from_url(&inpurl)?; let mut bodies = vec![]; let urls = proxy_config .backends_status .iter() - .filter(|k| { - if let Some(back) = &query.backend { - back == &k.name - } else { - true - } - }) .map(|pb| match Url::parse(&format!("{}/api/4/node_status", pb.url)) { - Ok(mut url) => { - query.append_to_url(&mut url); - Ok(url) - } + Ok(url) => Ok(url), Err(_) => Err(Error::with_msg(format!("parse error for: {:?}", pb))), }) .fold_ok(vec![], |mut a, x| { diff --git a/netfetch/src/test.rs b/netfetch/src/test.rs index 9994b40..09b4e32 100644 --- a/netfetch/src/test.rs +++ b/netfetch/src/test.rs @@ -16,7 +16,6 @@ fn ca_connect_1() { host: "".into(), port: 123, port_raw: 123, - backend: "".into(), cache_base_path: "".into(), listen: "".into(), sf_databuffer: Some(SfDatabuffer { @@ -30,6 +29,7 @@ fn ca_connect_1() { node_config: NodeConfig { name: "".into(), cluster: Cluster { + backend: "".into(), nodes: vec![], database: Database { host: "".into(), diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 4bc7bb4..60ac819 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -224,7 +224,6 @@ pub struct Node { pub port: u16, pub port_raw: u16, pub cache_base_path: PathBuf, - pub backend: String, pub sf_databuffer: Option, pub archiver_appliance: Option, pub channel_archiver: Option, @@ -239,7 +238,6 @@ impl Node { port: 4444, port_raw: 4444, cache_base_path: PathBuf::new(), - backend: "dummybackend".into(), sf_databuffer: Some(SfDatabuffer { data_base_path: PathBuf::new(), ksprefix: "daqlocal".into(), @@ -261,6 +259,7 @@ pub struct Database { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Cluster { + pub backend: String, pub nodes: Vec, pub database: Database, #[serde(rename = "runMapPulse", default)] @@ -1513,7 +1512,6 @@ pub fn test_cluster() -> Cluster { port: 6170 + id as u16, port_raw: 6170 + id as u16 + 100, cache_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)), - backend: "testbackend".into(), sf_databuffer: Some(SfDatabuffer { data_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)), ksprefix: "ks".into(), @@ -1524,6 +1522,7 @@ pub fn test_cluster() -> Cluster { }) .collect(); Cluster { + backend: "testbackend".into(), nodes, database: Database { host: "127.0.0.1".into(), @@ -1546,7 +1545,6 @@ pub fn sls_test_cluster() -> Cluster { port: 6190 + id as u16, port_raw: 6190 + id as u16 + 100, cache_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)), - backend: "sls-archive".into(), sf_databuffer: None, archiver_appliance: None, channel_archiver: Some(ChannelArchiver { @@ -1555,6 +1553,7 @@ pub fn sls_test_cluster() -> Cluster { }) .collect(); Cluster { + backend: "sls-archive".into(), nodes, database: Database { host: "127.0.0.1".into(), @@ -1577,7 +1576,6 @@ pub fn archapp_test_cluster() -> Cluster { port: 6200 + id as u16, port_raw: 6200 + id as u16 + 100, cache_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)), - backend: "sf-archive".into(), sf_databuffer: None, channel_archiver: None, archiver_appliance: Some(ArchiverAppliance { @@ -1586,6 +1584,7 @@ pub fn archapp_test_cluster() -> Cluster { }) .collect(); Cluster { + backend: "sf-archive".into(), nodes, database: Database { host: "127.0.0.1".into(),