Create SfDbChannel via function

This commit is contained in:
Dominik Werder
2023-06-14 08:17:11 +02:00
parent 7c77b07db5
commit 4899d71022
20 changed files with 125 additions and 155 deletions

View File

@@ -136,11 +136,7 @@ fn simple_fetch() {
let t1 = chrono::Utc::now();
let query = AggQuerySingleChannel {
channel_config: SfDbChConf {
channel: SfDbChannel {
backend: "sf-databuffer".into(),
name: "S10BC01-DBAM070:BAM_CH1_NORM".into(),
series: None,
},
channel: SfDbChannel::from_name("sf-databuffer", "S10BC01-DBAM070:BAM_CH1_NORM"),
keyspace: 3,
time_bin_size: TsNano(DAY),
array: true,

View File

@@ -61,11 +61,7 @@ pub async fn get_binned(
info!("end {}", end_date);
info!("-------");
let t1 = Utc::now();
let channel = SfDbChannel {
backend: channel_backend.clone(),
name: channel_name.into(),
series: None,
};
let channel = SfDbChannel::from_name(channel_backend, channel_name);
let range = NanoRange::from_date_time(beg_date, end_date).into();
// TODO this was before fixed using AggKind::DimXBins1
let mut query = BinnedQuery::new(channel, range, bin_count).for_time_weighted_scalar();

View File

@@ -67,11 +67,7 @@ fn api3_hdf_dim0_00() -> Result<(), Error> {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
let jsv = fetch_data_api_python_blob(
vec![SfDbChannel {
backend: TEST_BACKEND.into(),
name: "test-gen-i32-dim0-v00".into(),
series: None,
}],
vec![SfDbChannel::from_name(TEST_BACKEND, "test-gen-i32-dim0-v00")],
"1970-01-01T00:20:04.000Z",
"1970-01-01T00:21:10.000Z",
cluster,

View File

@@ -1,11 +1,9 @@
use crate::err::ErrConv;
use crate::nodes::require_test_hosts_running;
use crate::test::api4::common::fetch_binned_json;
use chrono::Utc;
use err::Error;
use http::StatusCode;
use hyper::Body;
use items_0::test::f32_cmp_near;
use items_0::test::f32_iter_cmp_near;
use items_0::test::f64_iter_cmp_near;
use items_0::WithLen;
@@ -29,11 +27,7 @@ fn make_query<S: Into<String>>(
end_date: &str,
bin_count_min: u32,
) -> Result<BinnedQuery, Error> {
let channel = SfDbChannel {
backend: TEST_BACKEND.into(),
name: name.into(),
series: None,
};
let channel = SfDbChannel::from_name(TEST_BACKEND, name);
let beg_date = beg_date.parse()?;
let end_date = end_date.parse()?;
let range = NanoRange::from_date_time(beg_date, end_date).into();
@@ -47,11 +41,7 @@ fn binned_d0_json_00() -> Result<(), Error> {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
let jsv = get_binned_json(
SfDbChannel {
backend: TEST_BACKEND.into(),
name: "test-gen-i32-dim0-v01".into(),
series: None,
},
SfDbChannel::from_name(TEST_BACKEND, "test-gen-i32-dim0-v01"),
"1970-01-01T00:20:04.000Z",
"1970-01-01T00:20:37.000Z",
6,
@@ -104,11 +94,7 @@ fn binned_d0_json_01a() -> Result<(), Error> {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
let jsv = get_binned_json(
SfDbChannel {
backend: TEST_BACKEND.into(),
name: "test-gen-i32-dim0-v01".into(),
series: None,
},
SfDbChannel::from_name(TEST_BACKEND, "test-gen-i32-dim0-v01"),
"1970-01-01T00:20:10.000Z",
"1970-01-01T00:40:30.000Z",
10,
@@ -162,11 +148,7 @@ fn binned_d0_json_01b() -> Result<(), Error> {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
let jsv = get_binned_json(
SfDbChannel {
backend: TEST_BACKEND.into(),
name: "test-gen-i32-dim0-v01".into(),
series: None,
},
SfDbChannel::from_name(TEST_BACKEND, "test-gen-i32-dim0-v01"),
"1970-01-01T00:20:10.000Z",
"1970-01-01T01:20:30.000Z",
10,
@@ -220,11 +202,7 @@ fn binned_d0_json_02() -> Result<(), Error> {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
let jsv = get_binned_json(
SfDbChannel {
backend: TEST_BACKEND.into(),
name: "test-gen-f64-dim1-v00".into(),
series: None,
},
SfDbChannel::from_name(TEST_BACKEND, "test-gen-f64-dim1-v00"),
"1970-01-01T00:20:00Z",
"1970-01-01T00:20:10Z",
//"1970-01-01T01:20:45.000Z",
@@ -279,11 +257,7 @@ fn binned_d0_json_03() -> Result<(), Error> {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
let jsv = get_binned_json(
SfDbChannel {
backend: TEST_BACKEND.into(),
name: "test-gen-f64-dim1-v00".into(),
series: None,
},
SfDbChannel::from_name(TEST_BACKEND, "test-gen-f64-dim1-v00"),
"1970-01-01T00:20:10.000Z",
"1970-01-01T01:20:20.000Z",
2,
@@ -312,11 +286,7 @@ fn binned_d0_json_04() -> Result<(), Error> {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
let jsv = get_binned_json(
SfDbChannel {
backend: TEST_BACKEND.into(),
name: "test-gen-i32-dim0-v01".into(),
series: None,
},
SfDbChannel::from_name(TEST_BACKEND, "test-gen-i32-dim0-v01"),
"1970-01-01T00:20:10.000Z",
"1970-01-01T04:20:30.000Z",
20,

View File

@@ -21,11 +21,7 @@ use url::Url;
const TEST_BACKEND: &str = "testbackend-00";
fn make_query<S: Into<String>>(name: S, beg_date: &str, end_date: &str) -> Result<PlainEventsQuery, Error> {
let channel = SfDbChannel {
backend: TEST_BACKEND.into(),
name: name.into(),
series: None,
};
let channel = SfDbChannel::from_name(TEST_BACKEND, name);
let beg_date = beg_date.parse()?;
let end_date = end_date.parse()?;
let range = NanoRange::from_date_time(beg_date, end_date);
@@ -59,11 +55,7 @@ fn events_plain_json_02_range_incomplete() -> Result<(), Error> {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
let jsv = events_plain_json(
SfDbChannel {
backend: TEST_BACKEND.into(),
name: "test-gen-i32-dim0-v01".into(),
series: None,
},
SfDbChannel::from_name(TEST_BACKEND, "test-gen-i32-dim0-v01"),
"1970-01-03T23:59:55.000Z",
"1970-01-04T00:00:01.000Z",
cluster,

View File

@@ -12,11 +12,7 @@ use query::api4::events::PlainEventsQuery;
const BACKEND: &str = "testbackend-00";
pub fn make_query<S: Into<String>>(name: S, beg_date: &str, end_date: &str) -> Result<PlainEventsQuery, Error> {
let channel = SfDbChannel {
backend: BACKEND.into(),
name: name.into(),
series: None,
};
let channel = SfDbChannel::from_name(BACKEND, name);
let beg_date = beg_date.parse()?;
let end_date = end_date.parse()?;
let range = NanoRange::from_date_time(beg_date, end_date);

View File

@@ -39,11 +39,7 @@ async fn get_json_common(
let beg_date: DateTime<Utc> = beg_date.parse()?;
let end_date: DateTime<Utc> = end_date.parse()?;
let channel_backend = TEST_BACKEND;
let channel = SfDbChannel {
backend: channel_backend.into(),
name: channel_name.into(),
series: None,
};
let channel = SfDbChannel::from_name(channel_backend, channel_name);
let range = NanoRange::from_date_time(beg_date, end_date).into();
let mut query = BinnedQuery::new(channel, range, bin_count).for_time_weighted_scalar();
query.set_timeout(Duration::from_millis(40000));

View File

@@ -32,11 +32,8 @@ pub async fn sf_databuffer_fetch_channel_by_series(
if let Some(row) = rows.pop() {
info!("sf_databuffer_fetch_channel_by_series got a row {row:?}");
let name: String = row.get(0);
let channel = SfDbChannel {
series: channel.series,
backend: ncc.node_config.cluster.backend.clone(),
name,
};
let channel =
SfDbChannel::from_full(ncc.node_config.cluster.backend.clone(), channel.series(), name);
info!("sf_databuffer_fetch_channel_by_series return {channel:?}");
Ok(channel)
} else {

View File

@@ -48,11 +48,7 @@ async fn agg_x_dim_0_inner() {
let node = make_test_node(0);
let query = AggQuerySingleChannel {
channel_config: SfDbChConf {
channel: SfDbChannel {
backend: "sf-databuffer".into(),
name: "S10BC01-DBAM070:EOM1_T1".into(),
series: None,
},
channel: SfDbChannel::from_name("sf-databuffer", "S10BC01-DBAM070:EOM1_T1"),
keyspace: 2,
time_bin_size: TsNano(DAY),
array: false,
@@ -105,11 +101,7 @@ async fn agg_x_dim_1_inner() {
let node = make_test_node(0);
let query = AggQuerySingleChannel {
channel_config: SfDbChConf {
channel: SfDbChannel {
backend: "ks".into(),
name: "wave1".into(),
series: None,
},
channel: SfDbChannel::from_name("ks", "wave1"),
keyspace: 3,
time_bin_size: TsNano(DAY),
array: true,

View File

@@ -821,11 +821,7 @@ mod test {
beg: DAY + HOUR * 5,
end: DAY + HOUR * 8,
};
let chn = netpod::SfDbChannel {
backend: BACKEND.into(),
name: "scalar-i32-be".into(),
series: None,
};
let chn = netpod::SfDbChannel::from_name(BACKEND, "scalar-i32-be");
// TODO read config from disk? Or expose the config from data generator?
let channel_config = SfDbChConf {
channel: chn,

View File

@@ -34,11 +34,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
{
let chn = ChannelGenProps {
config: SfDbChConf {
channel: SfDbChannel {
backend: backend.clone(),
name: "scalar-i32-be".into(),
series: None,
},
channel: SfDbChannel::from_name(&backend, "scalar-i32-be"),
keyspace: 2,
time_bin_size: TsNano(DAY),
scalar_type: ScalarType::I32,
@@ -53,11 +49,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
ensemble.channels.push(chn);
let chn = ChannelGenProps {
config: SfDbChConf {
channel: SfDbChannel {
backend: backend.clone(),
name: "wave-f64-be-n21".into(),
series: None,
},
channel: SfDbChannel::from_name(&backend, "wave-f64-be-n21"),
keyspace: 3,
time_bin_size: TsNano(DAY),
array: true,
@@ -72,11 +64,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
ensemble.channels.push(chn);
let chn = ChannelGenProps {
config: SfDbChConf {
channel: SfDbChannel {
backend: backend.clone(),
name: "wave-u16-le-n77".into(),
series: None,
},
channel: SfDbChannel::from_name(&backend, "wave-u16-le-n77"),
keyspace: 3,
time_bin_size: TsNano(DAY),
scalar_type: ScalarType::U16,
@@ -91,11 +79,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
ensemble.channels.push(chn);
let chn = ChannelGenProps {
config: SfDbChConf {
channel: SfDbChannel {
backend: backend.clone(),
name: "tw-scalar-i32-be".into(),
series: None,
},
channel: SfDbChannel::from_name(&backend, "tw-scalar-i32-be"),
keyspace: 2,
time_bin_size: TsNano(DAY),
scalar_type: ScalarType::I32,
@@ -110,11 +94,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
ensemble.channels.push(chn);
let chn = ChannelGenProps {
config: SfDbChConf {
channel: SfDbChannel {
backend: backend.clone(),
name: "const-regular-scalar-i32-be".into(),
series: None,
},
channel: SfDbChannel::from_name(&backend, "const-regular-scalar-i32-be"),
keyspace: 2,
time_bin_size: TsNano(DAY),
scalar_type: ScalarType::I32,

View File

@@ -81,11 +81,7 @@ pub fn main() -> Result<(), Error> {
let inp = Box::pin(disk::file_content_stream(path.clone(), file, disk_io_tune));
let ce = &config.entries[0];
let channel_config = SfDbChConf {
channel: SfDbChannel {
backend: String::new(),
name: config.channel_name.clone(),
series: None,
},
channel: SfDbChannel::from_name("", &config.channel_name),
keyspace: ce.ks as u8,
time_bin_size: ce.bs.clone(),
scalar_type: ce.scalar_type.clone(),

View File

@@ -1046,11 +1046,7 @@ impl Api1EventsBinaryHandler {
let chans = qu
.channels()
.iter()
.map(|ch| SfDbChannel {
backend: backend.into(),
name: ch.name().into(),
series: None,
})
.map(|ch| SfDbChannel::from_name(backend, ch.name()))
.collect();
// TODO use a better stream protocol with built-in error delivery.
let status_id = super::status_board()?.new_status_id();

View File

@@ -100,11 +100,7 @@ impl ChannelConfigHandler {
let conf = if let Some(_scyco) = &node_config.node_config.cluster.scylla {
let c = nodenet::channelconfig::channel_config(q.range.clone(), q.channel.clone(), node_config).await?;
ChannelConfigResponse {
channel: SfDbChannel {
series: c.series.clone(),
backend: q.channel.backend().into(),
name: c.name,
},
channel: SfDbChannel::from_full(q.channel.backend(), c.series.clone(), &c.name),
scalar_type: c.scalar_type,
byte_order: None,
shape: c.shape,
@@ -392,11 +388,7 @@ impl ScyllaChannelsWithType {
let mut list = Vec::new();
for row in res.rows_typed_or_empty::<(String, i64)>() {
let (channel_name, series) = row.err_conv()?;
let ch = SfDbChannel {
backend: backend.into(),
name: channel_name,
series: Some(series as u64),
};
let ch = SfDbChannel::from_full(backend, Some(series as u64), channel_name);
list.push(ch);
}
let ret = ChannelListWithType { channels: list };

View File

@@ -601,7 +601,7 @@ pub struct NodeStatus {
// the same channel-name is delivered via different methods.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SfDbChannel {
pub series: Option<u64>,
series: Option<u64>,
// "backend" is currently used in the existing systems for multiple purposes:
// it can indicate the facility (eg. sf-databuffer, hipa, ...) but also
// some special subsystem (eg. sf-rf-databuffer).
@@ -610,16 +610,36 @@ pub struct SfDbChannel {
}
impl SfDbChannel {
pub fn from_full<T: Into<String>, U: Into<String>>(backend: T, series: Option<u64>, name: U) -> Self {
Self {
backend: backend.into(),
series,
name: name.into(),
}
}
pub fn from_name<T: Into<String>, U: Into<String>>(backend: T, name: U) -> Self {
Self {
backend: backend.into(),
series: None,
name: name.into(),
}
}
pub fn backend(&self) -> &str {
&self.backend
}
pub fn series(&self) -> Option<u64> {
self.series
}
pub fn name(&self) -> &str {
&self.name
}
pub fn series(&self) -> Option<u64> {
self.series
pub fn set_series(&mut self, series: u64) {
self.series = Some(series);
}
}
@@ -680,6 +700,73 @@ impl ChannelTyped {
}
}
// Describes a Scylla-based "daqbuffer" style time series.
// The tuple `(backend, series)` is supposed to be unique.
// Contains also the name because it is so useful.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DaqbufSeries {
pub series: u64,
// "backend" is currently used in the existing systems for multiple purposes:
// it can indicate the facility (eg. sf-databuffer, hipa, ...) but also
// some special subsystem (eg. sf-rf-databuffer).
pub backend: String,
// This name is only for better user-facing messages. The (backend, series-id) is the identifier.
pub name: String,
}
impl DaqbufSeries {
pub fn series(&self) -> u64 {
self.series
}
pub fn backend(&self) -> &str {
&self.backend
}
pub fn name(&self) -> &str {
&self.name
}
}
impl FromUrl for DaqbufSeries {
fn from_url(url: &Url) -> Result<Self, Error> {
let pairs = get_url_query_pairs(url);
Self::from_pairs(&pairs)
}
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Error> {
let ret = DaqbufSeries {
series: pairs
.get("seriesId")
.ok_or_else(|| Error::with_public_msg("missing seriesId"))
.map(|x| x.parse::<u64>())??,
backend: pairs
.get("backend")
.ok_or_else(|| Error::with_public_msg("missing backend"))?
.into(),
name: pairs
.get("channelName")
.map(String::from)
.unwrap_or(String::new())
.into(),
};
Ok(ret)
}
}
impl AppendToUrl for DaqbufSeries {
fn append_to_url(&self, url: &mut Url) {
let mut g = url.query_pairs_mut();
g.append_pair("backend", &self.backend);
if self.name().len() > 0 {
g.append_pair("channelName", &self.name);
}
if let series = self.series {
g.append_pair("seriesId", &series.to_string());
}
}
}
pub struct HostPort {
pub host: String,
pub port: u16,

View File

@@ -85,11 +85,11 @@ pub async fn channel_config(range: NanoRange, channel: SfDbChannel, ncc: &NodeCo
debug!("try to get ChConf for sf-databuffer type backend");
// TODO in the future we should not need this:
let mut channel = sf_databuffer_fetch_channel_by_series(channel, ncc).await?;
if channel.series.is_none() {
if channel.series().is_none() {
let pgclient = dbconn::create_connection(&ncc.node_config.cluster.database).await?;
let pgclient = std::sync::Arc::new(pgclient);
let series = dbconn::find_series_sf_databuffer(&channel, pgclient).await?;
channel.series = Some(series);
channel.set_series(series);
}
let channel = channel;
debug!("channel_config AFTER {channel:?}");
@@ -97,7 +97,7 @@ pub async fn channel_config(range: NanoRange, channel: SfDbChannel, ncc: &NodeCo
debug!("channel_config THEN {c1:?}");
let ret = ChConf {
backend: c1.channel.backend,
series: channel.series,
series: channel.series(),
name: c1.channel.name,
scalar_type: c1.scalar_type,
shape: c1.shape,

View File

@@ -74,11 +74,7 @@ fn raw_data_00() {
},
ix: 0,
};
let channel = SfDbChannel {
series: None,
backend: TEST_BACKEND.into(),
name: "scalar-i32".into(),
};
let channel = SfDbChannel::from_name(TEST_BACKEND, "scalar-i32");
let range = NanoRange {
beg: SEC,
end: SEC * 10,

View File

@@ -112,7 +112,7 @@ impl BinnedQuery {
}
pub fn set_series_id(&mut self, series: u64) {
self.channel.series = Some(series);
self.channel.set_series(series);
}
pub fn channel_mut(&mut self) -> &mut SfDbChannel {

View File

@@ -127,7 +127,7 @@ impl PlainEventsQuery {
}
pub fn set_series_id(&mut self, series: u64) {
self.channel.series = Some(series);
self.channel.set_series(series);
}
pub fn set_do_test_main_error(&mut self, k: bool) {

View File

@@ -19,11 +19,7 @@ pub async fn config_from_scylla(chq: ChannelConfigQuery, scy: Arc<ScySession>) -
let cols = row.into_typed::<(i64, i32, Vec<i32>)>().err_conv()?;
let scalar_type = ScalarType::from_scylla_i32(cols.1)?;
let shape = Shape::from_scylla_shape_dims(&cols.2)?;
let channel = SfDbChannel {
series: Some(cols.0 as _),
backend: chq.channel.backend().into(),
name: chq.channel.name().into(),
};
let channel = SfDbChannel::from_full(chq.channel.backend(), Some(cols.0 as _), chq.channel.name());
let res = ChannelConfigResponse {
channel,
scalar_type,