diff --git a/daqbuffer/src/bin/daqbuffer.rs b/daqbuffer/src/bin/daqbuffer.rs index 8e66e1f..87c40b2 100644 --- a/daqbuffer/src/bin/daqbuffer.rs +++ b/daqbuffer/src/bin/daqbuffer.rs @@ -136,11 +136,7 @@ fn simple_fetch() { let t1 = chrono::Utc::now(); let query = AggQuerySingleChannel { channel_config: SfDbChConf { - channel: SfDbChannel { - backend: "sf-databuffer".into(), - name: "S10BC01-DBAM070:BAM_CH1_NORM".into(), - series: None, - }, + channel: SfDbChannel::from_name("sf-databuffer", "S10BC01-DBAM070:BAM_CH1_NORM"), keyspace: 3, time_bin_size: TsNano(DAY), array: true, diff --git a/daqbufp2/src/client.rs b/daqbufp2/src/client.rs index 2e6efb2..425672c 100644 --- a/daqbufp2/src/client.rs +++ b/daqbufp2/src/client.rs @@ -61,11 +61,7 @@ pub async fn get_binned( info!("end {}", end_date); info!("-------"); let t1 = Utc::now(); - let channel = SfDbChannel { - backend: channel_backend.clone(), - name: channel_name.into(), - series: None, - }; + let channel = SfDbChannel::from_name(channel_backend, channel_name); let range = NanoRange::from_date_time(beg_date, end_date).into(); // TODO this was before fixed using AggKind::DimXBins1 let mut query = BinnedQuery::new(channel, range, bin_count).for_time_weighted_scalar(); diff --git a/daqbufp2/src/test/api1/data_api_python.rs b/daqbufp2/src/test/api1/data_api_python.rs index fd069ff..4275e96 100644 --- a/daqbufp2/src/test/api1/data_api_python.rs +++ b/daqbufp2/src/test/api1/data_api_python.rs @@ -67,11 +67,7 @@ fn api3_hdf_dim0_00() -> Result<(), Error> { let rh = require_test_hosts_running()?; let cluster = &rh.cluster; let jsv = fetch_data_api_python_blob( - vec![SfDbChannel { - backend: TEST_BACKEND.into(), - name: "test-gen-i32-dim0-v00".into(), - series: None, - }], + vec![SfDbChannel::from_name(TEST_BACKEND, "test-gen-i32-dim0-v00")], "1970-01-01T00:20:04.000Z", "1970-01-01T00:21:10.000Z", cluster, diff --git a/daqbufp2/src/test/api4/binnedjson.rs b/daqbufp2/src/test/api4/binnedjson.rs index 9e4477f..bf9f931 100644 --- a/daqbufp2/src/test/api4/binnedjson.rs +++ b/daqbufp2/src/test/api4/binnedjson.rs @@ -1,11 +1,9 @@ use crate::err::ErrConv; use crate::nodes::require_test_hosts_running; -use crate::test::api4::common::fetch_binned_json; use chrono::Utc; use err::Error; use http::StatusCode; use hyper::Body; -use items_0::test::f32_cmp_near; use items_0::test::f32_iter_cmp_near; use items_0::test::f64_iter_cmp_near; use items_0::WithLen; @@ -29,11 +27,7 @@ fn make_query>( end_date: &str, bin_count_min: u32, ) -> Result { - let channel = SfDbChannel { - backend: TEST_BACKEND.into(), - name: name.into(), - series: None, - }; + let channel = SfDbChannel::from_name(TEST_BACKEND, name); let beg_date = beg_date.parse()?; let end_date = end_date.parse()?; let range = NanoRange::from_date_time(beg_date, end_date).into(); @@ -47,11 +41,7 @@ fn binned_d0_json_00() -> Result<(), Error> { let rh = require_test_hosts_running()?; let cluster = &rh.cluster; let jsv = get_binned_json( - SfDbChannel { - backend: TEST_BACKEND.into(), - name: "test-gen-i32-dim0-v01".into(), - series: None, - }, + SfDbChannel::from_name(TEST_BACKEND, "test-gen-i32-dim0-v01"), "1970-01-01T00:20:04.000Z", "1970-01-01T00:20:37.000Z", 6, @@ -104,11 +94,7 @@ fn binned_d0_json_01a() -> Result<(), Error> { let rh = require_test_hosts_running()?; let cluster = &rh.cluster; let jsv = get_binned_json( - SfDbChannel { - backend: TEST_BACKEND.into(), - name: "test-gen-i32-dim0-v01".into(), - series: None, - }, + SfDbChannel::from_name(TEST_BACKEND, "test-gen-i32-dim0-v01"), "1970-01-01T00:20:10.000Z", "1970-01-01T00:40:30.000Z", 10, @@ -162,11 +148,7 @@ fn binned_d0_json_01b() -> Result<(), Error> { let rh = require_test_hosts_running()?; let cluster = &rh.cluster; let jsv = get_binned_json( - SfDbChannel { - backend: TEST_BACKEND.into(), - name: "test-gen-i32-dim0-v01".into(), - series: None, - }, + SfDbChannel::from_name(TEST_BACKEND, "test-gen-i32-dim0-v01"), "1970-01-01T00:20:10.000Z", "1970-01-01T01:20:30.000Z", 10, @@ -220,11 +202,7 @@ fn binned_d0_json_02() -> Result<(), Error> { let rh = require_test_hosts_running()?; let cluster = &rh.cluster; let jsv = get_binned_json( - SfDbChannel { - backend: TEST_BACKEND.into(), - name: "test-gen-f64-dim1-v00".into(), - series: None, - }, + SfDbChannel::from_name(TEST_BACKEND, "test-gen-f64-dim1-v00"), "1970-01-01T00:20:00Z", "1970-01-01T00:20:10Z", //"1970-01-01T01:20:45.000Z", @@ -279,11 +257,7 @@ fn binned_d0_json_03() -> Result<(), Error> { let rh = require_test_hosts_running()?; let cluster = &rh.cluster; let jsv = get_binned_json( - SfDbChannel { - backend: TEST_BACKEND.into(), - name: "test-gen-f64-dim1-v00".into(), - series: None, - }, + SfDbChannel::from_name(TEST_BACKEND, "test-gen-f64-dim1-v00"), "1970-01-01T00:20:10.000Z", "1970-01-01T01:20:20.000Z", 2, @@ -312,11 +286,7 @@ fn binned_d0_json_04() -> Result<(), Error> { let rh = require_test_hosts_running()?; let cluster = &rh.cluster; let jsv = get_binned_json( - SfDbChannel { - backend: TEST_BACKEND.into(), - name: "test-gen-i32-dim0-v01".into(), - series: None, - }, + SfDbChannel::from_name(TEST_BACKEND, "test-gen-i32-dim0-v01"), "1970-01-01T00:20:10.000Z", "1970-01-01T04:20:30.000Z", 20, diff --git a/daqbufp2/src/test/api4/eventsjson.rs b/daqbufp2/src/test/api4/eventsjson.rs index 8ab48e6..bbb9432 100644 --- a/daqbufp2/src/test/api4/eventsjson.rs +++ b/daqbufp2/src/test/api4/eventsjson.rs @@ -21,11 +21,7 @@ use url::Url; const TEST_BACKEND: &str = "testbackend-00"; fn make_query>(name: S, beg_date: &str, end_date: &str) -> Result { - let channel = SfDbChannel { - backend: TEST_BACKEND.into(), - name: name.into(), - series: None, - }; + let channel = SfDbChannel::from_name(TEST_BACKEND, name); let beg_date = beg_date.parse()?; let end_date = end_date.parse()?; let range = NanoRange::from_date_time(beg_date, end_date); @@ -59,11 +55,7 @@ fn events_plain_json_02_range_incomplete() -> Result<(), Error> { let rh = require_test_hosts_running()?; let cluster = &rh.cluster; let jsv = events_plain_json( - SfDbChannel { - backend: TEST_BACKEND.into(), - name: "test-gen-i32-dim0-v01".into(), - series: None, - }, + SfDbChannel::from_name(TEST_BACKEND, "test-gen-i32-dim0-v01"), "1970-01-03T23:59:55.000Z", "1970-01-04T00:00:01.000Z", cluster, diff --git a/daqbufp2/src/test/api4/pulseiddiff.rs b/daqbufp2/src/test/api4/pulseiddiff.rs index 8976469..4d6fa0b 100644 --- a/daqbufp2/src/test/api4/pulseiddiff.rs +++ b/daqbufp2/src/test/api4/pulseiddiff.rs @@ -12,11 +12,7 @@ use query::api4::events::PlainEventsQuery; const BACKEND: &str = "testbackend-00"; pub fn make_query>(name: S, beg_date: &str, end_date: &str) -> Result { - let channel = SfDbChannel { - backend: BACKEND.into(), - name: name.into(), - series: None, - }; + let channel = SfDbChannel::from_name(BACKEND, name); let beg_date = beg_date.parse()?; let end_date = end_date.parse()?; let range = NanoRange::from_date_time(beg_date, end_date); diff --git a/daqbufp2/src/test/timeweightedjson.rs b/daqbufp2/src/test/timeweightedjson.rs index 603a0bb..c5fca81 100644 --- a/daqbufp2/src/test/timeweightedjson.rs +++ b/daqbufp2/src/test/timeweightedjson.rs @@ -39,11 +39,7 @@ async fn get_json_common( let beg_date: DateTime = beg_date.parse()?; let end_date: DateTime = end_date.parse()?; let channel_backend = TEST_BACKEND; - let channel = SfDbChannel { - backend: channel_backend.into(), - name: channel_name.into(), - series: None, - }; + let channel = SfDbChannel::from_name(channel_backend, channel_name); let range = NanoRange::from_date_time(beg_date, end_date).into(); let mut query = BinnedQuery::new(channel, range, bin_count).for_time_weighted_scalar(); query.set_timeout(Duration::from_millis(40000)); diff --git a/dbconn/src/query.rs b/dbconn/src/query.rs index 3ae10a3..6ca2662 100644 --- a/dbconn/src/query.rs +++ b/dbconn/src/query.rs @@ -32,11 +32,8 @@ pub async fn sf_databuffer_fetch_channel_by_series( if let Some(row) = rows.pop() { info!("sf_databuffer_fetch_channel_by_series got a row {row:?}"); let name: String = row.get(0); - let channel = SfDbChannel { - series: channel.series, - backend: ncc.node_config.cluster.backend.clone(), - name, - }; + let channel = + SfDbChannel::from_full(ncc.node_config.cluster.backend.clone(), channel.series(), name); info!("sf_databuffer_fetch_channel_by_series return {channel:?}"); Ok(channel) } else { diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 60d3a60..35bff8e 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -48,11 +48,7 @@ async fn agg_x_dim_0_inner() { let node = make_test_node(0); let query = AggQuerySingleChannel { channel_config: SfDbChConf { - channel: SfDbChannel { - backend: "sf-databuffer".into(), - name: "S10BC01-DBAM070:EOM1_T1".into(), - series: None, - }, + channel: SfDbChannel::from_name("sf-databuffer", "S10BC01-DBAM070:EOM1_T1"), keyspace: 2, time_bin_size: TsNano(DAY), array: false, @@ -105,11 +101,7 @@ async fn agg_x_dim_1_inner() { let node = make_test_node(0); let query = AggQuerySingleChannel { channel_config: SfDbChConf { - channel: SfDbChannel { - backend: "ks".into(), - name: "wave1".into(), - series: None, - }, + channel: SfDbChannel::from_name("ks", "wave1"), keyspace: 3, time_bin_size: TsNano(DAY), array: true, diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index 3ab33fc..fba34b7 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -821,11 +821,7 @@ mod test { beg: DAY + HOUR * 5, end: DAY + HOUR * 8, }; - let chn = netpod::SfDbChannel { - backend: BACKEND.into(), - name: "scalar-i32-be".into(), - series: None, - }; + let chn = netpod::SfDbChannel::from_name(BACKEND, "scalar-i32-be"); // TODO read config from disk? Or expose the config from data generator? let channel_config = SfDbChConf { channel: chn, diff --git a/disk/src/gen.rs b/disk/src/gen.rs index d74475c..049275f 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -34,11 +34,7 @@ pub async fn gen_test_data() -> Result<(), Error> { { let chn = ChannelGenProps { config: SfDbChConf { - channel: SfDbChannel { - backend: backend.clone(), - name: "scalar-i32-be".into(), - series: None, - }, + channel: SfDbChannel::from_name(&backend, "scalar-i32-be"), keyspace: 2, time_bin_size: TsNano(DAY), scalar_type: ScalarType::I32, @@ -53,11 +49,7 @@ pub async fn gen_test_data() -> Result<(), Error> { ensemble.channels.push(chn); let chn = ChannelGenProps { config: SfDbChConf { - channel: SfDbChannel { - backend: backend.clone(), - name: "wave-f64-be-n21".into(), - series: None, - }, + channel: SfDbChannel::from_name(&backend, "wave-f64-be-n21"), keyspace: 3, time_bin_size: TsNano(DAY), array: true, @@ -72,11 +64,7 @@ pub async fn gen_test_data() -> Result<(), Error> { ensemble.channels.push(chn); let chn = ChannelGenProps { config: SfDbChConf { - channel: SfDbChannel { - backend: backend.clone(), - name: "wave-u16-le-n77".into(), - series: None, - }, + channel: SfDbChannel::from_name(&backend, "wave-u16-le-n77"), keyspace: 3, time_bin_size: TsNano(DAY), scalar_type: ScalarType::U16, @@ -91,11 +79,7 @@ pub async fn gen_test_data() -> Result<(), Error> { ensemble.channels.push(chn); let chn = ChannelGenProps { config: SfDbChConf { - channel: SfDbChannel { - backend: backend.clone(), - name: "tw-scalar-i32-be".into(), - series: None, - }, + channel: SfDbChannel::from_name(&backend, "tw-scalar-i32-be"), keyspace: 2, time_bin_size: TsNano(DAY), scalar_type: ScalarType::I32, @@ -110,11 +94,7 @@ pub async fn gen_test_data() -> Result<(), Error> { ensemble.channels.push(chn); let chn = ChannelGenProps { config: SfDbChConf { - channel: SfDbChannel { - backend: backend.clone(), - name: "const-regular-scalar-i32-be".into(), - series: None, - }, + channel: SfDbChannel::from_name(&backend, "const-regular-scalar-i32-be"), keyspace: 2, time_bin_size: TsNano(DAY), scalar_type: ScalarType::I32, diff --git a/dq/src/bin/dq.rs b/dq/src/bin/dq.rs index f439ecf..c951f66 100644 --- a/dq/src/bin/dq.rs +++ b/dq/src/bin/dq.rs @@ -81,11 +81,7 @@ pub fn main() -> Result<(), Error> { let inp = Box::pin(disk::file_content_stream(path.clone(), file, disk_io_tune)); let ce = &config.entries[0]; let channel_config = SfDbChConf { - channel: SfDbChannel { - backend: String::new(), - name: config.channel_name.clone(), - series: None, - }, + channel: SfDbChannel::from_name("", &config.channel_name), keyspace: ce.ks as u8, time_bin_size: ce.bs.clone(), scalar_type: ce.scalar_type.clone(), diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 0d016ba..af26eaa 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -1046,11 +1046,7 @@ impl Api1EventsBinaryHandler { let chans = qu .channels() .iter() - .map(|ch| SfDbChannel { - backend: backend.into(), - name: ch.name().into(), - series: None, - }) + .map(|ch| SfDbChannel::from_name(backend, ch.name())) .collect(); // TODO use a better stream protocol with built-in error delivery. let status_id = super::status_board()?.new_status_id(); diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index ba381a7..b283c3c 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -100,11 +100,7 @@ impl ChannelConfigHandler { let conf = if let Some(_scyco) = &node_config.node_config.cluster.scylla { let c = nodenet::channelconfig::channel_config(q.range.clone(), q.channel.clone(), node_config).await?; ChannelConfigResponse { - channel: SfDbChannel { - series: c.series.clone(), - backend: q.channel.backend().into(), - name: c.name, - }, + channel: SfDbChannel::from_full(q.channel.backend(), c.series.clone(), &c.name), scalar_type: c.scalar_type, byte_order: None, shape: c.shape, @@ -392,11 +388,7 @@ impl ScyllaChannelsWithType { let mut list = Vec::new(); for row in res.rows_typed_or_empty::<(String, i64)>() { let (channel_name, series) = row.err_conv()?; - let ch = SfDbChannel { - backend: backend.into(), - name: channel_name, - series: Some(series as u64), - }; + let ch = SfDbChannel::from_full(backend, Some(series as u64), channel_name); list.push(ch); } let ret = ChannelListWithType { channels: list }; diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 3bceeb7..145872e 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -601,7 +601,7 @@ pub struct NodeStatus { // the same channel-name is delivered via different methods. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct SfDbChannel { - pub series: Option, + series: Option, // "backend" is currently used in the existing systems for multiple purposes: // it can indicate the facility (eg. sf-databuffer, hipa, ...) but also // some special subsystem (eg. sf-rf-databuffer). @@ -610,16 +610,36 @@ pub struct SfDbChannel { } impl SfDbChannel { + pub fn from_full, U: Into>(backend: T, series: Option, name: U) -> Self { + Self { + backend: backend.into(), + series, + name: name.into(), + } + } + + pub fn from_name, U: Into>(backend: T, name: U) -> Self { + Self { + backend: backend.into(), + series: None, + name: name.into(), + } + } + pub fn backend(&self) -> &str { &self.backend } + pub fn series(&self) -> Option { + self.series + } + pub fn name(&self) -> &str { &self.name } - pub fn series(&self) -> Option { - self.series + pub fn set_series(&mut self, series: u64) { + self.series = Some(series); } } @@ -680,6 +700,73 @@ impl ChannelTyped { } } +// Describes a Scylla-based "daqbuffer" style time series. +// The tuple `(backend, series)` is supposed to be unique. +// Contains also the name because it is so useful. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct DaqbufSeries { + pub series: u64, + // "backend" is currently used in the existing systems for multiple purposes: + // it can indicate the facility (eg. sf-databuffer, hipa, ...) but also + // some special subsystem (eg. sf-rf-databuffer). + pub backend: String, + // This name is only for better user-facing messages. The (backend, series-id) is the identifier. + pub name: String, +} + +impl DaqbufSeries { + pub fn series(&self) -> u64 { + self.series + } + + pub fn backend(&self) -> &str { + &self.backend + } + + pub fn name(&self) -> &str { + &self.name + } +} + +impl FromUrl for DaqbufSeries { + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &BTreeMap) -> Result { + let ret = DaqbufSeries { + series: pairs + .get("seriesId") + .ok_or_else(|| Error::with_public_msg("missing seriesId")) + .map(|x| x.parse::())??, + backend: pairs + .get("backend") + .ok_or_else(|| Error::with_public_msg("missing backend"))? + .into(), + name: pairs + .get("channelName") + .map(String::from) + .unwrap_or(String::new()) + .into(), + }; + Ok(ret) + } +} + +impl AppendToUrl for DaqbufSeries { + fn append_to_url(&self, url: &mut Url) { + let mut g = url.query_pairs_mut(); + g.append_pair("backend", &self.backend); + if self.name().len() > 0 { + g.append_pair("channelName", &self.name); + } + if let series = self.series { + g.append_pair("seriesId", &series.to_string()); + } + } +} + pub struct HostPort { pub host: String, pub port: u16, diff --git a/nodenet/src/channelconfig.rs b/nodenet/src/channelconfig.rs index 5bf43b2..a62d87f 100644 --- a/nodenet/src/channelconfig.rs +++ b/nodenet/src/channelconfig.rs @@ -85,11 +85,11 @@ pub async fn channel_config(range: NanoRange, channel: SfDbChannel, ncc: &NodeCo debug!("try to get ChConf for sf-databuffer type backend"); // TODO in the future we should not need this: let mut channel = sf_databuffer_fetch_channel_by_series(channel, ncc).await?; - if channel.series.is_none() { + if channel.series().is_none() { let pgclient = dbconn::create_connection(&ncc.node_config.cluster.database).await?; let pgclient = std::sync::Arc::new(pgclient); let series = dbconn::find_series_sf_databuffer(&channel, pgclient).await?; - channel.series = Some(series); + channel.set_series(series); } let channel = channel; debug!("channel_config AFTER {channel:?}"); @@ -97,7 +97,7 @@ pub async fn channel_config(range: NanoRange, channel: SfDbChannel, ncc: &NodeCo debug!("channel_config THEN {c1:?}"); let ret = ChConf { backend: c1.channel.backend, - series: channel.series, + series: channel.series(), name: c1.channel.name, scalar_type: c1.scalar_type, shape: c1.shape, diff --git a/nodenet/src/conn/test.rs b/nodenet/src/conn/test.rs index 61fa2e2..383302e 100644 --- a/nodenet/src/conn/test.rs +++ b/nodenet/src/conn/test.rs @@ -74,11 +74,7 @@ fn raw_data_00() { }, ix: 0, }; - let channel = SfDbChannel { - series: None, - backend: TEST_BACKEND.into(), - name: "scalar-i32".into(), - }; + let channel = SfDbChannel::from_name(TEST_BACKEND, "scalar-i32"); let range = NanoRange { beg: SEC, end: SEC * 10, diff --git a/query/src/api4/binned.rs b/query/src/api4/binned.rs index dcacfb8..768f604 100644 --- a/query/src/api4/binned.rs +++ b/query/src/api4/binned.rs @@ -112,7 +112,7 @@ impl BinnedQuery { } pub fn set_series_id(&mut self, series: u64) { - self.channel.series = Some(series); + self.channel.set_series(series); } pub fn channel_mut(&mut self) -> &mut SfDbChannel { diff --git a/query/src/api4/events.rs b/query/src/api4/events.rs index c297d82..a0147c9 100644 --- a/query/src/api4/events.rs +++ b/query/src/api4/events.rs @@ -127,7 +127,7 @@ impl PlainEventsQuery { } pub fn set_series_id(&mut self, series: u64) { - self.channel.series = Some(series); + self.channel.set_series(series); } pub fn set_do_test_main_error(&mut self, k: bool) { diff --git a/scyllaconn/src/config.rs b/scyllaconn/src/config.rs index e2b22a9..debd35a 100644 --- a/scyllaconn/src/config.rs +++ b/scyllaconn/src/config.rs @@ -19,11 +19,7 @@ pub async fn config_from_scylla(chq: ChannelConfigQuery, scy: Arc) - let cols = row.into_typed::<(i64, i32, Vec)>().err_conv()?; let scalar_type = ScalarType::from_scylla_i32(cols.1)?; let shape = Shape::from_scylla_shape_dims(&cols.2)?; - let channel = SfDbChannel { - series: Some(cols.0 as _), - backend: chq.channel.backend().into(), - name: chq.channel.name().into(), - }; + let channel = SfDbChannel::from_full(chq.channel.backend(), Some(cols.0 as _), chq.channel.name()); let res = ChannelConfigResponse { channel, scalar_type,