From 96fa8b5b0953017340c550f12b525cd52d71767a Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 18 Feb 2022 19:24:14 +0100 Subject: [PATCH] Restructure config between the kinds of backends --- archapp/src/archeng.rs | 15 +++++--- archapp/src/archeng/configs.rs | 13 ++++--- archapp/src/archeng/indexfiles.rs | 24 ++++++++----- archapp/src/archeng/pipe.rs | 12 +++++-- archapp_wrap/src/lib.rs | 2 +- commonio/Cargo.toml | 22 ++++++------ commonio/src/commonio.rs | 9 ++++- dbconn/src/scan.rs | 42 ++++++++++++++++++---- dbconn/src/search.rs | 12 ++++--- disk/src/aggtest.rs | 10 +++--- disk/src/gen.rs | 17 +++++---- disk/src/paths.rs | 32 +++++++++++------ httpret/src/channelarchiver.rs | 39 ++++++--------------- httpret/src/httpret.rs | 3 +- httpret/src/proxy.rs | 2 +- httpret/src/pulsemap.rs | 6 ++-- netfetch/src/test.rs | 10 +++--- netpod/src/netpod.rs | 58 +++++++++++++------------------ nodenet/src/conn.rs | 2 +- parse/src/channelconfig.rs | 3 ++ 20 files changed, 195 insertions(+), 138 deletions(-) diff --git a/archapp/src/archeng.rs b/archapp/src/archeng.rs index cae3d73..ab4dba1 100644 --- a/archapp/src/archeng.rs +++ b/archapp/src/archeng.rs @@ -18,8 +18,8 @@ use commonio::StatsChannel; use err::{ErrStr, Error}; use futures_util::StreamExt; use items::{StreamItem, WithLen}; -use netpod::log::*; use netpod::timeunits::SEC; +use netpod::{log::*, Database}; use netpod::{ChannelArchiver, ChannelConfigQuery, ChannelConfigResponse}; use netpod::{ScalarType, Shape}; use serde::Serialize; @@ -178,9 +178,10 @@ impl From for Error { pub async fn channel_config_from_db( q: &ChannelConfigQuery, - conf: &ChannelArchiver, + _conf: &ChannelArchiver, + database: &Database, ) -> Result { - let dbc = database_connect(&conf.database).await?; + let dbc = database_connect(database).await?; let sql = "select config from channels where name = $1"; let rows = dbc.query(sql, &[&q.channel.name()]).await.errstr()?; if let Some(row) = rows.first() { @@ -217,10 +218,14 @@ pub async fn channel_config_from_db( } } -pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> Result { +pub async fn channel_config( + q: &ChannelConfigQuery, + _conf: &ChannelArchiver, + database: &Database, +) -> Result { let _timed = Timed::new("channel_config"); let mut type_info = None; - let ixpaths = indexfiles::index_file_path_list(q.channel.clone(), conf.database.clone()).await?; + let ixpaths = indexfiles::index_file_path_list(q.channel.clone(), database.clone()).await?; info!("got categorized ixpaths: {:?}", ixpaths); let ixpath = ixpaths.first().unwrap().clone(); let stream = blockrefstream::blockref_stream(q.channel.clone(), q.range.clone(), q.expand, ixpath.clone()); diff --git a/archapp/src/archeng/configs.rs b/archapp/src/archeng/configs.rs index 6d208e7..9e88717 100644 --- a/archapp/src/archeng/configs.rs +++ b/archapp/src/archeng/configs.rs @@ -2,7 +2,7 @@ use crate::archeng::indexfiles::database_connect; use err::{ErrStr, Error}; use futures_core::{Future, Stream}; use futures_util::{FutureExt, StreamExt}; -use netpod::log::*; +use netpod::{log::*, NodeConfigCached}; use netpod::{Channel, ChannelArchiver, ChannelConfigQuery, ChannelConfigResponse, Database, NanoRange}; use serde::Serialize; use serde_json::Value as JsVal; @@ -126,6 +126,7 @@ pub enum ConfigItem { } pub struct ConfigStream { + node: NodeConfigCached, conf: ChannelArchiver, inp: ChannelNameStream, inp_done: bool, @@ -136,8 +137,9 @@ pub struct ConfigStream { } impl ConfigStream { - pub fn new(inp: ChannelNameStream, conf: ChannelArchiver) -> Self { + pub fn new(inp: ChannelNameStream, node: NodeConfigCached, conf: ChannelArchiver) -> Self { Self { + node, conf, inp, inp_done: false, @@ -178,7 +180,7 @@ impl Stream for ConfigStream { Ready(Ok(Res::Response(item))) => { self.get_fut = None; let name = item.channel.name.clone(); - let dbconf = self.conf.database.clone(); + let dbconf = self.node.node_config.cluster.database.clone(); let config = serde_json::to_value(&item)?; let fut = async move { let dbc = database_connect(&dbconf).await?; @@ -193,7 +195,7 @@ impl Stream for ConfigStream { } Ready(Ok(Res::TimedOut(name))) => { self.get_fut = None; - let dbconf = self.conf.database.clone(); + let dbconf = self.node.node_config.cluster.database.clone(); let config = serde_json::to_value(&"TimedOut")?; let fut = async move { let dbc = database_connect(&dbconf).await?; @@ -220,6 +222,7 @@ impl Stream for ConfigStream { match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(item))) => { let conf = self.conf.clone(); + let database = self.node.node_config.cluster.database.clone(); let fut = async move { let channel = Channel { name: item, @@ -236,7 +239,7 @@ impl Stream for ConfigStream { range: NanoRange { beg, end }, expand: true, }; - let fut = super::channel_config(&q, &conf); + let fut = super::channel_config(&q, &conf, &database); let fut = tokio::time::timeout(Duration::from_millis(2000), fut); match fut.await { Ok(Ok(k)) => Ok(Res::Response(k)), diff --git a/archapp/src/archeng/indexfiles.rs b/archapp/src/archeng/indexfiles.rs index ae8587b..054a6f2 100644 --- a/archapp/src/archeng/indexfiles.rs +++ b/archapp/src/archeng/indexfiles.rs @@ -6,6 +6,7 @@ use err::{ErrStr, Error}; use futures_core::{Future, Stream}; use futures_util::stream::unfold; use netpod::log::*; +use netpod::NodeConfigCached; use netpod::{Channel, ChannelArchiver, Database}; use regex::Regex; use std::collections::BTreeMap; @@ -161,13 +162,15 @@ enum ScanIndexFilesSteps { } struct ScanIndexFiles { + node: NodeConfigCached, conf: ChannelArchiver, steps: ScanIndexFilesSteps, } impl ScanIndexFiles { - fn new(conf: ChannelArchiver) -> Self { + fn new(conf: ChannelArchiver, node: NodeConfigCached) -> Self { Self { + node, conf, steps: ScanIndexFilesSteps::Level0, } @@ -184,7 +187,7 @@ impl ScanIndexFiles { ScanIndexFilesSteps::Level1(paths) => { let paths = get_level_1(paths).await?; info!("collected {} level 1 paths", paths.len()); - let dbc = database_connect(&self.conf.database).await?; + let dbc = database_connect(&self.node.node_config.cluster.database).await?; for p in paths { let ps = p.to_string_lossy(); let rows = dbc @@ -238,8 +241,8 @@ impl UnfoldExec for ScanIndexFiles { } } -pub fn scan_index_files(conf: ChannelArchiver) -> impl Stream> { - unfold_stream(ScanIndexFiles::new(conf.clone())) +pub fn scan_index_files(conf: ChannelArchiver, node: NodeConfigCached) -> impl Stream> { + unfold_stream(ScanIndexFiles::new(conf.clone(), node)) /* enum UnfoldState { Running(ScanIndexFiles), @@ -302,13 +305,16 @@ enum ScanChannelsSteps { } struct ScanChannels { + node: NodeConfigCached, + #[allow(unused)] conf: ChannelArchiver, steps: ScanChannelsSteps, } impl ScanChannels { - fn new(conf: ChannelArchiver) -> Self { + fn new(node: NodeConfigCached, conf: ChannelArchiver) -> Self { Self { + node, conf, steps: ScanChannelsSteps::Start, } @@ -322,7 +328,7 @@ impl ScanChannels { Ok(Some((format!("Start"), self))) } SelectIndexFile => { - let dbc = database_connect(&self.conf.database).await?; + let dbc = database_connect(&self.node.node_config.cluster.database).await?; let sql = "select path from indexfiles where ts_last_channel_search < now() - interval '1 hour' limit 1"; let rows = dbc.query(sql, &[]).await.errstr()?; @@ -337,7 +343,7 @@ impl ScanChannels { ReadChannels(mut paths) => { // TODO stats let stats = &StatsChannel::dummy(); - let dbc = database_connect(&self.conf.database).await?; + let dbc = database_connect(&self.node.node_config.cluster.database).await?; if let Some(path) = paths.pop() { let rows = dbc .query("select rowid from indexfiles where path = $1", &[&path]) @@ -411,8 +417,8 @@ impl UnfoldExec for ScanChannels { } } -pub fn scan_channels(conf: ChannelArchiver) -> impl Stream> { - unfold_stream(ScanChannels::new(conf.clone())) +pub fn scan_channels(node: NodeConfigCached, conf: ChannelArchiver) -> impl Stream> { + unfold_stream(ScanChannels::new(node, conf.clone())) } #[derive(Debug)] diff --git a/archapp/src/archeng/pipe.rs b/archapp/src/archeng/pipe.rs index 8ff8341..c3374d8 100644 --- a/archapp/src/archeng/pipe.rs +++ b/archapp/src/archeng/pipe.rs @@ -8,14 +8,16 @@ use items::eventsitem::EventsItem; use items::plainevents::{PlainEvents, WavePlainEvents}; use items::waveevents::{WaveNBinner, WaveXBinner}; use items::{EventsNodeProcessor, Framable, LogItem, RangeCompletableItem, StreamItem}; +use netpod::log::*; use netpod::query::RawEventsQuery; -use netpod::{log::*, AggKind, Shape}; +use netpod::{AggKind, NodeConfigCached, Shape}; use netpod::{ChannelArchiver, ChannelConfigQuery}; use std::pin::Pin; use streams::rangefilter::RangeFilter; pub async fn make_event_pipe( evq: &RawEventsQuery, + node: NodeConfigCached, conf: ChannelArchiver, ) -> Result> + Send>>, Error> { debug!("make_event_pipe {:?}", evq); @@ -25,10 +27,14 @@ pub async fn make_event_pipe( range: evq.range.clone(), expand: evq.agg_kind.need_expand(), }; - crate::archeng::channel_config_from_db(&q, &conf).await? + crate::archeng::channel_config_from_db(&q, &conf, &node.node_config.cluster.database).await? }; debug!("Channel config: {:?}", channel_config); - let ixpaths = crate::archeng::indexfiles::index_file_path_list(evq.channel.clone(), conf.database.clone()).await?; + let ixpaths = crate::archeng::indexfiles::index_file_path_list( + evq.channel.clone(), + node.node_config.cluster.database.clone(), + ) + .await?; debug!("got categorized ixpaths: {:?}", ixpaths); let ixpath = if let Some(x) = ixpaths.first() { x.clone() diff --git a/archapp_wrap/src/lib.rs b/archapp_wrap/src/lib.rs index 2ad0f55..5e7d090 100644 --- a/archapp_wrap/src/lib.rs +++ b/archapp_wrap/src/lib.rs @@ -29,7 +29,7 @@ pub fn scan_files_insert( Box::pin(archapp::parse::scan_files_to_database( pairs, aa.data_base_paths.clone(), - aa.database.clone(), + node_config.node_config.cluster.database.clone(), )) } diff --git a/commonio/Cargo.toml b/commonio/Cargo.toml index 08e597c..fb22b91 100644 --- a/commonio/Cargo.toml +++ b/commonio/Cargo.toml @@ -8,19 +8,19 @@ edition = "2021" path = "src/commonio.rs" [dependencies] -tokio = { version = "1.7.1", features = ["io-util", "net", "time", "sync", "fs", "parking_lot"] } -tracing = "0.1.26" +tokio = { version = "=1.16.1", features = ["io-util", "net", "time", "sync", "fs", "parking_lot"] } +tracing = "0.1" futures-core = "0.3.15" futures-util = "0.3.15" -bytes = "1.0.1" -serde = "1.0.126" -serde_derive = "1.0.126" -serde_json = "1.0.64" -bincode = "1.3.3" -chrono = "0.4.19" -async-channel = "1.6" -parking_lot = "0.11.2" -crc32fast = "1.2.1" +bytes = "1" +serde = { version = "1", features = ["derive"] } +serde_derive = "1" +serde_json = "1" +bincode = "1.3" +chrono = "0.4" +async-channel = "1" +parking_lot = "0.11" +crc32fast = "1.2" err = { path = "../err" } taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } diff --git a/commonio/src/commonio.rs b/commonio/src/commonio.rs index fa16397..d5cef72 100644 --- a/commonio/src/commonio.rs +++ b/commonio/src/commonio.rs @@ -9,7 +9,7 @@ use netpod::log::*; use netpod::{DiskStats, OpenStats, ReadExactStats, ReadStats, SeekStats}; use std::fmt; use std::io::{self, SeekFrom}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Instant; use tokio::fs::{File, OpenOptions}; @@ -18,6 +18,13 @@ use tokio::io::{AsyncReadExt, AsyncSeekExt}; const LOG_IO: bool = true; const STATS_IO: bool = true; +pub async fn tokio_read(path: impl AsRef) -> Result, Error> { + let path = path.as_ref(); + tokio::fs::read(path) + .await + .map_err(|e| Error::with_msg_no_trace(format!("Can not open {path:?} {e:?}"))) +} + pub struct StatsChannel { chn: Sender>, } diff --git a/dbconn/src/scan.rs b/dbconn/src/scan.rs index 243dfa7..b3cbca9 100644 --- a/dbconn/src/scan.rs +++ b/dbconn/src/scan.rs @@ -269,7 +269,14 @@ impl Stream for UpdatedDbWithChannelNamesStream { msg: format!("Got ident {:?}", pself.ident), count: 43, }; - let s = FindChannelNamesFromConfigReadDir::new(&pself.node_config.node.data_base_path); + let base_path = &pself + .node_config + .node + .sf_databuffer + .as_ref() + .ok_or_else(|| Error::with_msg(format!("missing sf databuffer config in node")))? + .data_base_path; + let s = FindChannelNamesFromConfigReadDir::new(base_path); *pself.find = Some(s); Ready(Some(Ok(ret))) } @@ -347,7 +354,13 @@ pub async fn update_db_with_channel_names( dbc.query("begin", &[]).await.errconv()?; let dbc = Arc::new(dbc); let tx = Arc::new(tx); - find_channel_names_from_config(&node_config.node.data_base_path, |ch| { + let base_path = &node_config + .node + .sf_databuffer + .as_ref() + .ok_or_else(|| Error::with_msg(format!("missing sf databuffer config in node")))? + .data_base_path; + find_channel_names_from_config(base_path, |ch| { let ch = ch.to_owned(); let dbc = dbc.clone(); let c1 = c1.clone(); @@ -407,7 +420,14 @@ pub async fn update_db_with_channel_names( pub fn update_db_with_channel_names_3<'a>( node_config: &'a NodeConfigCached, ) -> impl Stream> + 'static { - futures_util::future::ready(node_config.node.data_base_path.clone()) + let base_path = &node_config + .node + .sf_databuffer + .as_ref() + .ok_or_else(|| Error::with_msg(format!("missing sf databuffer config in node"))) + .unwrap() + .data_base_path; + futures_util::future::ready(base_path.clone()) .then(|path| tokio::fs::read_dir(path)) .map(Result::unwrap) .map(|rd| { @@ -552,9 +572,13 @@ pub async fn update_db_with_channel_config( count_inserted: &mut usize, count_updated: &mut usize, ) -> Result { - let path = node_config + let base_path = &node_config .node - .data_base_path + .sf_databuffer + .as_ref() + .ok_or_else(|| Error::with_msg(format!("missing sf databuffer config in node")))? + .data_base_path; + let path = base_path .join("config") .join(channel) .join("latest") @@ -806,6 +830,12 @@ pub async fn update_db_with_channel_datafiles( channel: &str, dbc: Arc, ) -> Result<(), Error> { + let base_path = &node_config + .node + .sf_databuffer + .as_ref() + .ok_or_else(|| Error::with_msg(format!("missing sf databuffer config in node")))? + .data_base_path; let writer = DatafileDbWriter { node_id: node_disk_ident.rowid(), channel_id: channel_id, @@ -814,7 +844,7 @@ pub async fn update_db_with_channel_datafiles( }; let mut n_nothing = 0; for ks in &[2, 3, 4] { - match find_channel_datafiles_in_ks(&node_config.node.data_base_path, ks_prefix, *ks, channel, &writer).await { + match find_channel_datafiles_in_ks(base_path, ks_prefix, *ks, channel, &writer).await { /*Err(Error::ChannelDatadirNotFound { .. }) => { n_nothing += 1; }*/ diff --git a/dbconn/src/search.rs b/dbconn/src/search.rs index 75fce99..719be30 100644 --- a/dbconn/src/search.rs +++ b/dbconn/src/search.rs @@ -1,6 +1,8 @@ use crate::{create_connection, ErrConv}; use err::Error; -use netpod::{ChannelArchiver, ChannelSearchQuery, ChannelSearchResult, ChannelSearchSingleResult, NodeConfigCached}; +use netpod::{ + ChannelArchiver, ChannelSearchQuery, ChannelSearchResult, ChannelSearchSingleResult, Database, NodeConfigCached, +}; use serde_json::Value as JsVal; pub async fn search_channel_databuffer( @@ -78,7 +80,8 @@ pub async fn search_channel_databuffer( pub async fn search_channel_archeng( query: ChannelSearchQuery, backend: String, - conf: &ChannelArchiver, + _conf: &ChannelArchiver, + database: &Database, ) -> Result { // Channel archiver provides only channel name. Also, search criteria are currently ANDed. // Therefore search only if user only provides a name criterion. @@ -102,7 +105,7 @@ pub async fn search_channel_archeng( " order by c.name", " limit 100" )); - let cl = create_connection(&conf.database).await?; + let cl = create_connection(database).await?; let rows = cl.query(sql.as_str(), &[&query.name_regex]).await.errconv()?; let mut res = vec![]; for row in rows { @@ -188,8 +191,9 @@ pub async fn search_channel( query: ChannelSearchQuery, node_config: &NodeConfigCached, ) -> Result { + let database = &node_config.node_config.cluster.database; if let Some(conf) = node_config.node.channel_archiver.as_ref() { - search_channel_archeng(query, node_config.node.backend.clone(), conf).await + search_channel_archeng(query, node_config.node.backend.clone(), conf, database).await } else if let Some(_conf) = node_config.node.archiver_appliance.as_ref() { // TODO err::todoval() diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index e0b50a3..77650e1 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -1,7 +1,7 @@ use crate::eventblobs::EventChunkerMultifile; use crate::eventchunker::EventChunkerConf; -use netpod::timeunits::*; use netpod::{test_data_base_path_databuffer, FileIoBufferSize}; +use netpod::{timeunits::*, SfDatabuffer}; use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, NanoRange, Nanos, Node, ScalarType, Shape}; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -13,11 +13,13 @@ pub fn make_test_node(id: u32) -> Node { port: 8800 + id as u16, port_raw: 8800 + id as u16 + 100, // TODO use a common function to supply the tmp path. - data_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)), cache_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)), - ksprefix: "ks".into(), backend: "testbackend".into(), - splits: None, + sf_databuffer: Some(SfDatabuffer { + data_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)), + ksprefix: "ks".into(), + splits: None, + }), archiver_appliance: None, channel_archiver: None, } diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 4de8729..9f0faf6 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -2,7 +2,7 @@ use crate::ChannelConfigExt; use bitshuffle::bitshuffle_compress; use bytes::{BufMut, BytesMut}; use err::Error; -use netpod::{timeunits::*, ByteOrder, Channel, ChannelConfig, GenVar, Node, Shape}; +use netpod::{timeunits::*, ByteOrder, Channel, ChannelConfig, GenVar, Node, SfDatabuffer, Shape}; use netpod::{Nanos, ScalarType}; use std::path::{Path, PathBuf}; use tokio::fs::{File, OpenOptions}; @@ -123,11 +123,13 @@ pub async fn gen_test_data() -> Result<(), Error> { listen: "0.0.0.0".into(), port: 7780 + i1 as u16, port_raw: 7780 + i1 as u16 + 100, - data_base_path: data_base_path.join(format!("node{:02}", i1)), cache_base_path: data_base_path.join(format!("node{:02}", i1)), - ksprefix: ksprefix.clone(), backend: "testbackend".into(), - splits: None, + sf_databuffer: Some(SfDatabuffer { + data_base_path: data_base_path.join(format!("node{:02}", i1)), + ksprefix: ksprefix.clone(), + splits: None, + }), archiver_appliance: None, channel_archiver: None, }; @@ -158,10 +160,11 @@ async fn gen_node(split: u32, node: &Node, ensemble: &Ensemble) -> Result<(), Er } async fn gen_channel(chn: &ChannelGenProps, split: u32, node: &Node, ensemble: &Ensemble) -> Result<(), Error> { - let config_path = node.data_base_path.join("config").join(&chn.config.channel.name); - let channel_path = node + let sfc = node.sf_databuffer.as_ref().unwrap(); + let config_path = sfc.data_base_path.join("config").join(&chn.config.channel.name); + let channel_path = sfc .data_base_path - .join(format!("{}_{}", node.ksprefix, chn.config.keyspace)) + .join(format!("{}_{}", sfc.ksprefix, chn.config.keyspace)) .join("byTime") .join(&chn.config.channel.name); tokio::fs::create_dir_all(&channel_path).await?; diff --git a/disk/src/paths.rs b/disk/src/paths.rs index 1a59053..5be86a7 100644 --- a/disk/src/paths.rs +++ b/disk/src/paths.rs @@ -6,8 +6,15 @@ use std::path::PathBuf; // TODO remove/replace this pub fn datapath(timebin: u64, config: &netpod::ChannelConfig, split: u32, node: &Node) -> PathBuf { - node.data_base_path - .join(format!("{}_{}", node.ksprefix, config.keyspace)) + node.sf_databuffer + .as_ref() + .unwrap() + .data_base_path + .join(format!( + "{}_{}", + node.sf_databuffer.as_ref().unwrap().ksprefix, + config.keyspace + )) .join("byTime") .join(config.channel.name.clone()) .join(format!("{:019}", timebin)) @@ -26,9 +33,10 @@ pub async fn datapaths_for_timebin( config: &netpod::ChannelConfig, node: &Node, ) -> Result, Error> { - let timebin_path = node + let sfc = node.sf_databuffer.as_ref().unwrap(); + let timebin_path = sfc .data_base_path - .join(format!("{}_{}", node.ksprefix, config.keyspace)) + .join(format!("{}_{}", sfc.ksprefix, config.keyspace)) .join("byTime") .join(config.channel.name.clone()) .join(format!("{:019}", timebin)); @@ -47,7 +55,7 @@ pub async fn datapaths_for_timebin( let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a }); if vv == 10 { let split: u64 = dn.parse()?; - match &node.splits { + match &sfc.splits { Some(sps) => { if sps.contains(&split) { splits.push(split); @@ -61,9 +69,9 @@ pub async fn datapaths_for_timebin( } let mut ret = vec![]; for split in splits { - let path = node + let path = sfc .data_base_path - .join(format!("{}_{}", node.ksprefix, config.keyspace)) + .join(format!("{}_{}", sfc.ksprefix, config.keyspace)) .join("byTime") .join(config.channel.name.clone()) .join(format!("{:019}", timebin)) @@ -75,9 +83,10 @@ pub async fn datapaths_for_timebin( } pub fn channel_timebins_dir_path(channel_config: &ChannelConfig, node: &Node) -> Result { - let ret = node + let sfc = node.sf_databuffer.as_ref().unwrap(); + let ret = sfc .data_base_path - .join(format!("{}_{}", node.ksprefix, channel_config.keyspace)) + .join(format!("{}_{}", sfc.ksprefix, channel_config.keyspace)) .join("byTime") .join(&channel_config.channel.name); Ok(ret) @@ -103,9 +112,10 @@ pub fn index_path(ts: Nanos, channel_config: &ChannelConfig, split: u32, node: & } pub fn data_dir_path_tb(ks: u32, channel_name: &str, tb: u32, split: u32, node: &Node) -> Result { - let ret = node + let sfc = node.sf_databuffer.as_ref().unwrap(); + let ret = sfc .data_base_path - .join(format!("{}_{}", node.ksprefix, ks)) + .join(format!("{}_{}", sfc.ksprefix, ks)) .join("byTime") .join(channel_name) .join(format!("{:019}", tb)) diff --git a/httpret/src/channelarchiver.rs b/httpret/src/channelarchiver.rs index 2ed7a65..601d700 100644 --- a/httpret/src/channelarchiver.rs +++ b/httpret/src/channelarchiver.rs @@ -111,7 +111,7 @@ impl ScanIndexFiles { .ok_or(Error::with_msg_no_trace( "this node is not configured as channel archiver", ))?; - let s = archapp_wrap::archapp::archeng::indexfiles::scan_index_files(conf.clone()); + let s = archapp_wrap::archapp::archeng::indexfiles::scan_index_files(conf.clone(), node_config.clone()); let s = s.map_err(Error::from); let s = json_lines_stream(s); Ok(response(StatusCode::OK) @@ -151,7 +151,7 @@ impl ScanChannels { .ok_or(Error::with_msg_no_trace( "this node is not configured as channel archiver", ))?; - let s = archapp_wrap::archapp::archeng::indexfiles::scan_channels(conf.clone()); + let s = archapp_wrap::archapp::archeng::indexfiles::scan_channels(node_config.clone(), conf.clone()); let s = s.map_err(Error::from); let s = json_lines_stream(s); Ok(response(StatusCode::OK) @@ -184,15 +184,9 @@ impl ChannelNames { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } info!("{} handle uri: {:?}", Self::name(), req.uri()); - let conf = node_config - .node - .channel_archiver - .as_ref() - .ok_or(Error::with_msg_no_trace( - "this node is not configured as channel archiver", - ))?; + let database = &node_config.node_config.cluster.database; use archapp_wrap::archapp::archeng; - let stream = archeng::configs::ChannelNameStream::new(conf.database.clone()); + let stream = archeng::configs::ChannelNameStream::new(database.clone()); let stream = stream.map_err(Error::from); let stream = json_lines_stream(stream); Ok(response(StatusCode::OK) @@ -225,6 +219,7 @@ impl ScanConfigs { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } info!("{} handle uri: {:?}", Self::name(), req.uri()); + let database = &node_config.node_config.cluster.database; let conf = node_config .node .channel_archiver @@ -233,8 +228,8 @@ impl ScanConfigs { "this node is not configured as channel archiver", ))?; use archapp_wrap::archapp::archeng; - let stream = archeng::configs::ChannelNameStream::new(conf.database.clone()); - let stream = archeng::configs::ConfigStream::new(stream, conf.clone()); + let stream = archeng::configs::ChannelNameStream::new(database.clone()); + let stream = archeng::configs::ConfigStream::new(stream, node_config.clone(), conf.clone()); let stream = stream.map_err(Error::from); let stream = json_lines_stream(stream); Ok(response(StatusCode::OK) @@ -267,13 +262,7 @@ impl BlockRefStream { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } info!("{} handle uri: {:?}", Self::name(), req.uri()); - let conf = node_config - .node - .channel_archiver - .as_ref() - .ok_or(Error::with_msg_no_trace( - "this node is not configured as channel archiver", - ))?; + let database = &node_config.node_config.cluster.database; let range = NanoRange { beg: 0, end: u64::MAX }; let url = Url::parse(&format!("dummy:{}", req.uri()))?; let pairs = get_url_query_pairs(&url); @@ -284,7 +273,7 @@ impl BlockRefStream { //name: "ARIDI-PCT:CURRENT".into(), }; use archapp_wrap::archapp::archeng; - let ixpaths = archeng::indexfiles::index_file_path_list(channel.clone(), conf.database.clone()).await?; + let ixpaths = archeng::indexfiles::index_file_path_list(channel.clone(), database.clone()).await?; info!("got categorized ixpaths: {:?}", ixpaths); let ixpath = ixpaths.first().unwrap().clone(); let s = archeng::blockrefstream::blockref_stream(channel, range, true, ixpath); @@ -337,13 +326,7 @@ impl BlockStream { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } info!("{} handle uri: {:?}", Self::name(), req.uri()); - let conf = node_config - .node - .channel_archiver - .as_ref() - .ok_or(Error::with_msg_no_trace( - "this node is not configured as channel archiver", - ))?; + let database = &node_config.node_config.cluster.database; let range = NanoRange { beg: 0, end: u64::MAX }; let url = Url::parse(&format!("dummy:{}", req.uri()))?; let pairs = get_url_query_pairs(&url); @@ -354,7 +337,7 @@ impl BlockStream { name: channel_name, }; use archapp_wrap::archapp::archeng; - let ixpaths = archeng::indexfiles::index_file_path_list(channel.clone(), conf.database.clone()).await?; + let ixpaths = archeng::indexfiles::index_file_path_list(channel.clone(), database.clone()).await?; info!("got categorized ixpaths: {:?}", ixpaths); let ixpath = ixpaths.first().unwrap().clone(); let s = archeng::blockrefstream::blockref_stream(channel, range.clone(), true, ixpath); diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 1157a12..d2d0fe5 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -782,7 +782,8 @@ pub async fn channel_config(req: Request, node_config: &NodeConfigCached) //let pairs = get_url_query_pairs(&url); let q = ChannelConfigQuery::from_url(&url)?; let conf = if let Some(conf) = &node_config.node.channel_archiver { - archapp_wrap::archapp::archeng::channel_config_from_db(&q, conf).await? + archapp_wrap::archapp::archeng::channel_config_from_db(&q, conf, &node_config.node_config.cluster.database) + .await? } else if let Some(conf) = &node_config.node.archiver_appliance { archapp_wrap::channel_config(&q, conf).await? } else { diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index 776a06d..14926ff 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -544,7 +544,7 @@ fn get_query_host_for_backend(backend: &str, proxy_config: &ProxyConfig) -> Resu } fn get_query_host_for_backend_2(backend: &str, proxy_config: &ProxyConfig) -> Result { - for back in &proxy_config.backends2 { + for back in &proxy_config.backends_pulse_map { if back.name == backend { return Ok(back.url.clone()); } diff --git a/httpret/src/pulsemap.rs b/httpret/src/pulsemap.rs index fceb473..362043b 100644 --- a/httpret/src/pulsemap.rs +++ b/httpret/src/pulsemap.rs @@ -78,10 +78,10 @@ fn timer_channel_names() -> Vec { async fn datafiles_for_channel(name: String, node_config: &NodeConfigCached) -> Result, Error> { let mut a = vec![]; - let n = &node_config.node; - let channel_path = n + let sfc = node_config.node.sf_databuffer.as_ref().unwrap(); + let channel_path = sfc .data_base_path - .join(format!("{}_2", n.ksprefix)) + .join(format!("{}_2", sfc.ksprefix)) .join("byTime") .join(&name); let mut rd = tokio::fs::read_dir(&channel_path).await?; diff --git a/netfetch/src/test.rs b/netfetch/src/test.rs index 3ff807a..9994b40 100644 --- a/netfetch/src/test.rs +++ b/netfetch/src/test.rs @@ -1,6 +1,6 @@ use err::Error; use futures_util::StreamExt; -use netpod::log::*; +use netpod::{log::*, SfDatabuffer}; use netpod::{Cluster, Database, Node, NodeConfig, NodeConfigCached}; use std::collections::BTreeMap; use std::iter::FromIterator; @@ -17,11 +17,13 @@ fn ca_connect_1() { port: 123, port_raw: 123, backend: "".into(), - data_base_path: "".into(), cache_base_path: "".into(), listen: "".into(), - ksprefix: "".into(), - splits: None, + sf_databuffer: Some(SfDatabuffer { + data_base_path: "".into(), + ksprefix: "".into(), + splits: None, + }), archiver_appliance: None, channel_archiver: None, }, diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 656f164..8718eb0 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -200,16 +200,21 @@ impl ScalarType { } } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct SfDatabuffer { + pub data_base_path: PathBuf, + pub ksprefix: String, + pub splits: Option>, +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ArchiverAppliance { pub data_base_paths: Vec, - pub database: Database, } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ChannelArchiver { pub data_base_paths: Vec, - pub database: Database, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -218,27 +223,28 @@ pub struct Node { pub listen: String, pub port: u16, pub port_raw: u16, - pub data_base_path: PathBuf, pub cache_base_path: PathBuf, - pub ksprefix: String, pub backend: String, - pub splits: Option>, + pub sf_databuffer: Option, pub archiver_appliance: Option, pub channel_archiver: Option, } impl Node { + // TODO needed? Could `sf_databuffer` be None? pub fn dummy() -> Self { Self { host: "dummy".into(), listen: "dummy".into(), port: 4444, port_raw: 4444, - data_base_path: PathBuf::new(), cache_base_path: PathBuf::new(), - ksprefix: "daqlocal".into(), backend: "dummybackend".into(), - splits: None, + sf_databuffer: Some(SfDatabuffer { + data_base_path: PathBuf::new(), + ksprefix: "daqlocal".into(), + splits: None, + }), archiver_appliance: None, channel_archiver: None, } @@ -1350,7 +1356,7 @@ pub struct ProxyConfig { pub port: u16, pub search_hosts: Vec, pub backends: Vec, - pub backends2: Vec, + pub backends_pulse_map: Vec, pub backends_search: Vec, pub api_0_search_hosts: Option>, pub api_0_search_backends: Option>, @@ -1491,11 +1497,13 @@ pub fn test_cluster() -> Cluster { listen: "0.0.0.0".into(), port: 6170 + id as u16, port_raw: 6170 + id as u16 + 100, - data_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)), cache_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)), - ksprefix: "ks".into(), backend: "testbackend".into(), - splits: None, + sf_databuffer: Some(SfDatabuffer { + data_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)), + ksprefix: "ks".into(), + splits: None, + }), archiver_appliance: None, channel_archiver: None, }) @@ -1503,7 +1511,7 @@ pub fn test_cluster() -> Cluster { Cluster { nodes, database: Database { - host: "localhost".into(), + host: "127.0.0.1".into(), name: "testingdaq".into(), user: "testingdaq".into(), pass: "testingdaq".into(), @@ -1522,27 +1530,19 @@ pub fn sls_test_cluster() -> Cluster { listen: "0.0.0.0".into(), port: 6190 + id as u16, port_raw: 6190 + id as u16 + 100, - data_base_path: "UNUSED".into(), cache_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)), - ksprefix: "UNUSED".into(), backend: "sls-archive".into(), - splits: None, + sf_databuffer: None, archiver_appliance: None, channel_archiver: Some(ChannelArchiver { data_base_paths: vec![test_data_base_path_channel_archiver_sls()], - database: Database { - host: "localhost".into(), - name: "testingdaq".into(), - user: "testingdaq".into(), - pass: "testingdaq".into(), - }, }), }) .collect(); Cluster { nodes, database: Database { - host: "localhost".into(), + host: "127.0.0.1".into(), name: "testingdaq".into(), user: "testingdaq".into(), pass: "testingdaq".into(), @@ -1561,27 +1561,19 @@ pub fn archapp_test_cluster() -> Cluster { listen: "0.0.0.0".into(), port: 6200 + id as u16, port_raw: 6200 + id as u16 + 100, - data_base_path: "UNUSED".into(), cache_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)), - ksprefix: "UNUSED".into(), backend: "sf-archive".into(), - splits: None, + sf_databuffer: None, channel_archiver: None, archiver_appliance: Some(ArchiverAppliance { data_base_paths: vec![test_data_base_path_archiver_appliance()], - database: Database { - host: "localhost".into(), - name: "testingdaq".into(), - user: "testingdaq".into(), - pass: "testingdaq".into(), - }, }), }) .collect(); Cluster { nodes, database: Database { - host: "localhost".into(), + host: "127.0.0.1".into(), name: "testingdaq".into(), user: "testingdaq".into(), pass: "testingdaq".into(), diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 71778f0..a44dc6e 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -119,7 +119,7 @@ async fn events_conn_handler_inner_try( let mut p1: Pin> + Send>> = if let Some(aa) = &node_config.node.channel_archiver { - match archapp_wrap::archapp::archeng::pipe::make_event_pipe(&evq, aa.clone()).await { + match archapp_wrap::archapp::archeng::pipe::make_event_pipe(&evq, node_config.clone(), aa.clone()).await { Ok(j) => j, Err(e) => return Err((e, netout))?, } diff --git a/parse/src/channelconfig.rs b/parse/src/channelconfig.rs index e7036cf..db50e06 100644 --- a/parse/src/channelconfig.rs +++ b/parse/src/channelconfig.rs @@ -313,6 +313,9 @@ pub async fn channel_config(q: &ChannelConfigQuery, node: &Node) -> Result Result { let path = node + .sf_databuffer + .as_ref() + .ok_or_else(|| Error::with_msg(format!("missing sf databuffer config in node")))? .data_base_path .join("config") .join(&channel.name)