From f4c1539c377970eb5ee0ccec60b4aedaae4fd681 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 26 Jun 2023 16:31:59 +0200 Subject: [PATCH] Factor config quorum decision --- disk/src/disk.rs | 1 + netpod/src/netpod.rs | 24 ++++++++----- nodenet/src/configquorum.rs | 69 ++++++++++++++++++++++++++++--------- 3 files changed, 69 insertions(+), 25 deletions(-) diff --git a/disk/src/disk.rs b/disk/src/disk.rs index c897836..4dccc89 100644 --- a/disk/src/disk.rs +++ b/disk/src/disk.rs @@ -61,6 +61,7 @@ use tokio::io::ReadBuf; use tokio::sync::mpsc; // TODO move to databuffer-specific crate +// TODO duplicate of SfChFetchInfo? #[derive(Clone, Debug, Serialize, Deserialize)] pub struct SfDbChConf { pub channel: SfDbChannel, diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index bf4ec5a..e22e876 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -479,6 +479,12 @@ impl Node { prometheus_api_bind: None, } } + + // TODO should a node know how to reach itself? Because, depending on network + // topology (proxies etc.) the way to reach a node depends on the tuple `(node, client)`. + pub fn baseurl(&self) -> Url { + format!("http://{}:{}/api/4/", self.host, self.port).parse().unwrap() + } } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -765,9 +771,7 @@ impl AppendToUrl for DaqbufSeries { if self.name().len() > 0 { g.append_pair("channelName", &self.name); } - if let series = self.series { - g.append_pair("seriesId", &series.to_string()); - } + g.append_pair("seriesId", &self.series.to_string()); } } @@ -803,7 +807,7 @@ impl From for u64 { } } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, PartialOrd, Eq, Ord)] pub enum ByteOrder { Little, Big, @@ -1174,7 +1178,7 @@ where pub ix: [T; 2], } -#[derive(Clone, PartialEq, PartialOrd)] +#[derive(Clone, PartialEq, PartialOrd, Eq, Ord)] pub struct DtNano(u64); impl DtNano { @@ -1182,6 +1186,10 @@ impl DtNano { Self(ns) } + pub fn from_ms(ns: u64) -> Self { + Self(MS * ns) + } + pub fn ns(&self) -> u64 { self.0 } @@ -2621,7 +2629,7 @@ pub struct ChannelInfo { pub msg: serde_json::Value, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd, Eq, Ord)] pub struct ChConf { backend: String, series: u64, @@ -2668,7 +2676,7 @@ impl ChConf { // Includes the necessary information to know where to localize datafiles for sf-databuffer // and what (approximate) types to expect. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd, Eq, Ord)] pub struct SfChFetchInfo { backend: String, name: String, @@ -2747,7 +2755,7 @@ impl SfChFetchInfo { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd, Eq, Ord)] pub enum ChannelTypeConfigGen { Scylla(ChConf), SfDatabuffer(SfChFetchInfo), diff --git a/nodenet/src/configquorum.rs b/nodenet/src/configquorum.rs index 8044da7..cc260bc 100644 --- a/nodenet/src/configquorum.rs +++ b/nodenet/src/configquorum.rs @@ -1,45 +1,78 @@ use crate::channelconfig::http_get_channel_config; -use disk::SfDbChConf; use err::Error; -use httpclient::url::Url; -use netpod::log::*; use netpod::range::evrange::SeriesRange; +use netpod::ChConf; use netpod::ChannelConfigQuery; +use netpod::ChannelConfigResponse; use netpod::ChannelTypeConfigGen; +use netpod::DtNano; use netpod::NodeConfigCached; use netpod::SfChFetchInfo; use netpod::SfDbChannel; +use std::collections::BTreeMap; + +fn decide_sf_ch_config_quorum(inp: Vec) -> Result, Error> { + let mut histo = BTreeMap::new(); + for item in inp { + let item = match item { + ChannelConfigResponse::SfDatabuffer(k) => ChannelTypeConfigGen::SfDatabuffer(SfChFetchInfo::new( + k.backend, + k.name, + k.keyspace, + DtNano::from_ms(k.timebinsize), + k.byte_order, + k.scalar_type, + k.shape, + )), + ChannelConfigResponse::Daqbuf(k) => { + ChannelTypeConfigGen::Scylla(ChConf::new(k.backend, k.series, k.scalar_type, k.shape, k.name)) + } + }; + if histo.contains_key(&item) { + *histo.get_mut(&item).unwrap() += 1; + } else { + histo.insert(item, 0u32); + } + } + let mut v: Vec<_> = histo.into_iter().collect(); + v.sort_unstable_by_key(|x| x.1); + match v.pop() { + Some((x, _)) => Ok(Some(x)), + None => Ok(None), + } +} async fn find_sf_ch_config_quorum( channel: SfDbChannel, range: SeriesRange, ncc: &NodeConfigCached, -) -> Result { +) -> Result, Error> { let range = match range { SeriesRange::TimeRange(x) => x, SeriesRange::PulseRange(_) => return Err(Error::with_msg_no_trace("expect TimeRange")), }; - type _A = SfDbChannel; - type _B = SfDbChConf; - let mut ress = Vec::new(); + let mut all = Vec::new(); for node in &ncc.node_config.cluster.nodes { // TODO add a baseurl function to struct Node - let baseurl: Url = format!("http://{}:{}/api/4/", node.host, node.port).parse()?; let qu = ChannelConfigQuery { channel: channel.clone(), range: range.clone(), // TODO expand: false, }; - let res = http_get_channel_config(qu, baseurl.clone()).await?; - info!("GOT: {res:?}"); - ress.push(res); + let res = http_get_channel_config(qu, node.baseurl()).await?; + all.push(res); + } + let qu = decide_sf_ch_config_quorum(all)?; + match qu { + Some(item) => match item { + ChannelTypeConfigGen::Scylla(_) => Err(Error::with_msg_no_trace( + "find_sf_ch_config_quorum not a sf-databuffer config", + )), + ChannelTypeConfigGen::SfDatabuffer(item) => Ok(Some(item)), + }, + None => Ok(None), } - // TODO find most likely values. - - // TODO create new endpoint which only returns the most matching config entry - // for some given channel and time range. - todo!() } pub async fn find_config_basics_quorum( @@ -48,7 +81,9 @@ pub async fn find_config_basics_quorum( ncc: &NodeConfigCached, ) -> Result { if let Some(_cfg) = &ncc.node.sf_databuffer { - let ret: SfChFetchInfo = find_sf_ch_config_quorum(channel, range, ncc).await?; + let ret: SfChFetchInfo = find_sf_ch_config_quorum(channel, range, ncc) + .await? + .ok_or_else(|| Error::with_msg_no_trace("no config found at all"))?; Ok(ChannelTypeConfigGen::SfDatabuffer(ret)) } else if let Some(_cfg) = &ncc.node_config.cluster.scylla { let ret = dbconn::channelconfig::chconf_from_scylla_type_backend(&channel, ncc)