Factor config quorum decision
This commit is contained in:
@@ -61,6 +61,7 @@ use tokio::io::ReadBuf;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
// TODO move to databuffer-specific crate
|
||||
// TODO duplicate of SfChFetchInfo?
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct SfDbChConf {
|
||||
pub channel: SfDbChannel,
|
||||
|
||||
@@ -479,6 +479,12 @@ impl Node {
|
||||
prometheus_api_bind: None,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO should a node know how to reach itself? Because, depending on network
|
||||
// topology (proxies etc.) the way to reach a node depends on the tuple `(node, client)`.
|
||||
pub fn baseurl(&self) -> Url {
|
||||
format!("http://{}:{}/api/4/", self.host, self.port).parse().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
@@ -765,9 +771,7 @@ impl AppendToUrl for DaqbufSeries {
|
||||
if self.name().len() > 0 {
|
||||
g.append_pair("channelName", &self.name);
|
||||
}
|
||||
if let series = self.series {
|
||||
g.append_pair("seriesId", &series.to_string());
|
||||
}
|
||||
g.append_pair("seriesId", &self.series.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -803,7 +807,7 @@ impl From<FilePos> for u64 {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, PartialOrd, Eq, Ord)]
|
||||
pub enum ByteOrder {
|
||||
Little,
|
||||
Big,
|
||||
@@ -1174,7 +1178,7 @@ where
|
||||
pub ix: [T; 2],
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, PartialOrd)]
|
||||
#[derive(Clone, PartialEq, PartialOrd, Eq, Ord)]
|
||||
pub struct DtNano(u64);
|
||||
|
||||
impl DtNano {
|
||||
@@ -1182,6 +1186,10 @@ impl DtNano {
|
||||
Self(ns)
|
||||
}
|
||||
|
||||
pub fn from_ms(ns: u64) -> Self {
|
||||
Self(MS * ns)
|
||||
}
|
||||
|
||||
pub fn ns(&self) -> u64 {
|
||||
self.0
|
||||
}
|
||||
@@ -2621,7 +2629,7 @@ pub struct ChannelInfo {
|
||||
pub msg: serde_json::Value,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd, Eq, Ord)]
|
||||
pub struct ChConf {
|
||||
backend: String,
|
||||
series: u64,
|
||||
@@ -2668,7 +2676,7 @@ impl ChConf {
|
||||
|
||||
// Includes the necessary information to know where to localize datafiles for sf-databuffer
|
||||
// and what (approximate) types to expect.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd, Eq, Ord)]
|
||||
pub struct SfChFetchInfo {
|
||||
backend: String,
|
||||
name: String,
|
||||
@@ -2747,7 +2755,7 @@ impl SfChFetchInfo {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd, Eq, Ord)]
|
||||
pub enum ChannelTypeConfigGen {
|
||||
Scylla(ChConf),
|
||||
SfDatabuffer(SfChFetchInfo),
|
||||
|
||||
@@ -1,45 +1,78 @@
|
||||
use crate::channelconfig::http_get_channel_config;
|
||||
use disk::SfDbChConf;
|
||||
use err::Error;
|
||||
use httpclient::url::Url;
|
||||
use netpod::log::*;
|
||||
use netpod::range::evrange::SeriesRange;
|
||||
use netpod::ChConf;
|
||||
use netpod::ChannelConfigQuery;
|
||||
use netpod::ChannelConfigResponse;
|
||||
use netpod::ChannelTypeConfigGen;
|
||||
use netpod::DtNano;
|
||||
use netpod::NodeConfigCached;
|
||||
use netpod::SfChFetchInfo;
|
||||
use netpod::SfDbChannel;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
fn decide_sf_ch_config_quorum(inp: Vec<ChannelConfigResponse>) -> Result<Option<ChannelTypeConfigGen>, Error> {
|
||||
let mut histo = BTreeMap::new();
|
||||
for item in inp {
|
||||
let item = match item {
|
||||
ChannelConfigResponse::SfDatabuffer(k) => ChannelTypeConfigGen::SfDatabuffer(SfChFetchInfo::new(
|
||||
k.backend,
|
||||
k.name,
|
||||
k.keyspace,
|
||||
DtNano::from_ms(k.timebinsize),
|
||||
k.byte_order,
|
||||
k.scalar_type,
|
||||
k.shape,
|
||||
)),
|
||||
ChannelConfigResponse::Daqbuf(k) => {
|
||||
ChannelTypeConfigGen::Scylla(ChConf::new(k.backend, k.series, k.scalar_type, k.shape, k.name))
|
||||
}
|
||||
};
|
||||
if histo.contains_key(&item) {
|
||||
*histo.get_mut(&item).unwrap() += 1;
|
||||
} else {
|
||||
histo.insert(item, 0u32);
|
||||
}
|
||||
}
|
||||
let mut v: Vec<_> = histo.into_iter().collect();
|
||||
v.sort_unstable_by_key(|x| x.1);
|
||||
match v.pop() {
|
||||
Some((x, _)) => Ok(Some(x)),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
async fn find_sf_ch_config_quorum(
|
||||
channel: SfDbChannel,
|
||||
range: SeriesRange,
|
||||
ncc: &NodeConfigCached,
|
||||
) -> Result<SfChFetchInfo, Error> {
|
||||
) -> Result<Option<SfChFetchInfo>, Error> {
|
||||
let range = match range {
|
||||
SeriesRange::TimeRange(x) => x,
|
||||
SeriesRange::PulseRange(_) => return Err(Error::with_msg_no_trace("expect TimeRange")),
|
||||
};
|
||||
type _A = SfDbChannel;
|
||||
type _B = SfDbChConf;
|
||||
let mut ress = Vec::new();
|
||||
let mut all = Vec::new();
|
||||
for node in &ncc.node_config.cluster.nodes {
|
||||
// TODO add a baseurl function to struct Node
|
||||
let baseurl: Url = format!("http://{}:{}/api/4/", node.host, node.port).parse()?;
|
||||
let qu = ChannelConfigQuery {
|
||||
channel: channel.clone(),
|
||||
range: range.clone(),
|
||||
// TODO
|
||||
expand: false,
|
||||
};
|
||||
let res = http_get_channel_config(qu, baseurl.clone()).await?;
|
||||
info!("GOT: {res:?}");
|
||||
ress.push(res);
|
||||
let res = http_get_channel_config(qu, node.baseurl()).await?;
|
||||
all.push(res);
|
||||
}
|
||||
let qu = decide_sf_ch_config_quorum(all)?;
|
||||
match qu {
|
||||
Some(item) => match item {
|
||||
ChannelTypeConfigGen::Scylla(_) => Err(Error::with_msg_no_trace(
|
||||
"find_sf_ch_config_quorum not a sf-databuffer config",
|
||||
)),
|
||||
ChannelTypeConfigGen::SfDatabuffer(item) => Ok(Some(item)),
|
||||
},
|
||||
None => Ok(None),
|
||||
}
|
||||
// TODO find most likely values.
|
||||
|
||||
// TODO create new endpoint which only returns the most matching config entry
|
||||
// for some given channel and time range.
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub async fn find_config_basics_quorum(
|
||||
@@ -48,7 +81,9 @@ pub async fn find_config_basics_quorum(
|
||||
ncc: &NodeConfigCached,
|
||||
) -> Result<ChannelTypeConfigGen, Error> {
|
||||
if let Some(_cfg) = &ncc.node.sf_databuffer {
|
||||
let ret: SfChFetchInfo = find_sf_ch_config_quorum(channel, range, ncc).await?;
|
||||
let ret: SfChFetchInfo = find_sf_ch_config_quorum(channel, range, ncc)
|
||||
.await?
|
||||
.ok_or_else(|| Error::with_msg_no_trace("no config found at all"))?;
|
||||
Ok(ChannelTypeConfigGen::SfDatabuffer(ret))
|
||||
} else if let Some(_cfg) = &ncc.node_config.cluster.scylla {
|
||||
let ret = dbconn::channelconfig::chconf_from_scylla_type_backend(&channel, ncc)
|
||||
|
||||
Reference in New Issue
Block a user