From 0daa66a8001422c6136815b1bcae82caf6e2b5c8 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 19 Jun 2023 10:04:56 +0200 Subject: [PATCH] WIP refactor channel config usage --- .cargo/cargo-lock | 39 ++++---- dbconn/src/channelconfig.rs | 18 +--- httpret/src/api1.rs | 38 +++++--- httpret/src/channel_status.rs | 29 +++--- httpret/src/channelconfig.rs | 50 ++-------- netpod/src/netpod.rs | 129 ++++++++++++++++++++++--- nodenet/src/channelconfig.rs | 175 ++++++++++++++++++++-------------- nodenet/src/configquorum.rs | 8 +- nodenet/src/scylla.rs | 18 ++-- parse/src/channelconfig.rs | 17 ---- scyllaconn/src/config.rs | 50 ---------- scyllaconn/src/scyllaconn.rs | 1 - 12 files changed, 303 insertions(+), 269 deletions(-) delete mode 100644 scyllaconn/src/config.rs diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index 9e451ad..5dc0386 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -8,7 +8,7 @@ version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a76fd60b23679b7d19bd066031410fb7e458ccc5e958eb5c325888ce4baedc97" dependencies = [ - "gimli 0.27.2", + "gimli 0.27.3", ] [[package]] @@ -130,9 +130,9 @@ checksum = "6b4930d2cb77ce62f89ee5d5289b4ac049559b1c45539271f5ed4fdc7db34545" [[package]] name = "arrayvec" -version = "0.7.3" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8868f09ff8cea88b079da74ae569d9b8c62a23c68c746240b704ee6f7525c89c" +checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "async-channel" @@ -381,9 +381,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.3.3" +version = "4.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca8f255e4b8027970e78db75e78831229c9815fdbfa67eb1a1b777a62e24b4a0" +checksum = "80672091db20273a15cf9fdd4e47ed43b5091ec9841bf4c6145c9dfbbcae09ed" dependencies = [ "clap_builder", "clap_derive", @@ -392,9 +392,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.3.3" +version = "4.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acd4f3c17c83b0ba34ffbc4f8bbd74f079413f747f84a6f89292f138057e36ab" +checksum = "c1458a1df40e1e2afebb7ab60ce55c1fa8f431146205aa5f4887e0b111c27636" dependencies = [ "anstream", "anstyle", @@ -527,9 +527,9 @@ dependencies = [ [[package]] name = "cpufeatures" -version = "0.2.7" +version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e4c1eaa2012c47becbbad2ab175484c2a84d1185b566fb2cc5b8707343dfe58" +checksum = "03e69e28e9f7f77debdedbaafa2866e1de9ba56df55a8bd7cfc724c25a09987c" dependencies = [ "libc", ] @@ -1230,9 +1230,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.27.2" +version = "0.27.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad0a93d233ebf96623465aad4046a8d3aa4da22d4f4beba5388838c8a434bbb4" +checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" [[package]] name = "h2" @@ -2741,9 +2741,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.96" +version = "1.0.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "057d394a50403bcac12672b2b18fb387ab6d289d957dab67dd201875391e52f1" +checksum = "bdf3bf93142acad5821c99197022e170842cdbc1c30482b98750c688c640842a" dependencies = [ "itoa", "ryu", @@ -2765,9 +2765,9 @@ dependencies = [ [[package]] name = "sha2" -version = "0.10.6" +version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0" +checksum = "479fb9d862239e610720565ca91403019f2f00410f1864c5aa7479b950a76ed8" dependencies = [ "cfg-if", "cpufeatures", @@ -3468,11 +3468,10 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "want" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" dependencies = [ - "log", "try-lock", ] @@ -3898,9 +3897,9 @@ checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" [[package]] name = "winnow" -version = "0.4.6" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61de7bac303dc551fe038e2b3cef0f571087a47571ea6e79a87692ac99b99699" +checksum = "ca0ace3845f0d96209f0375e6d367e3eb87eb65d27d445bdc9f1843a26f39448" dependencies = [ "memchr", ] diff --git a/dbconn/src/channelconfig.rs b/dbconn/src/channelconfig.rs index 6c0bae0..a25f7d2 100644 --- a/dbconn/src/channelconfig.rs +++ b/dbconn/src/channelconfig.rs @@ -23,7 +23,7 @@ pub async fn chconf_from_scylla_type_backend(channel: &SfDbChannel, ncc: &NodeCo ncc.node_config.cluster.backend ); } - let backend = channel.backend().into(); + let backend = channel.backend(); let dbconf = &ncc.node_config.cluster.database; let pgclient = crate::create_connection(dbconf).await?; if let Some(series) = channel.series() { @@ -44,13 +44,7 @@ pub async fn chconf_from_scylla_type_backend(channel: &SfDbChannel, ncc: &NodeCo let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(1) as u8)?; // TODO can I get a slice from psql driver? let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(2))?; - let ret = ChConf { - backend, - series: Some(series), - name, - scalar_type, - shape, - }; + let ret = ChConf::new(backend, series, scalar_type, shape, name); Ok(ret) } } else { @@ -77,13 +71,7 @@ pub async fn chconf_from_scylla_type_backend(channel: &SfDbChannel, ncc: &NodeCo let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(2) as u8)?; // TODO can I get a slice from psql driver? let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(3))?; - let ret = ChConf { - backend, - series: Some(series), - name, - scalar_type, - shape, - }; + let ret = ChConf::new(backend, series, scalar_type, shape, name); Ok(ret) } } else { diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 76b7e14..848b394 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -640,9 +640,13 @@ impl Api1ChannelHeader { } } -async fn find_ch_conf(channel: SfDbChannel, ncc: NodeConfigCached) -> Result { - //find_sf_channel_config_basics_quorum() - todo!() +async fn find_ch_conf( + range: NanoRange, + channel: SfDbChannel, + ncc: NodeConfigCached, +) -> Result { + let ret = nodenet::channelconfig::channel_config(range, channel, &ncc).await?; + Ok(ret) } pub struct DataApiPython3DataStream { @@ -651,8 +655,8 @@ pub struct DataApiPython3DataStream { current_channel: Option, node_config: NodeConfigCached, chan_stream: Option> + Send>>>, - config_fut: Option> + Send>>>, - ch_conf: Option, + config_fut: Option> + Send>>>, + ch_conf: Option, disk_io_tune: DiskIoTune, do_decompress: bool, #[allow(unused)] @@ -882,14 +886,22 @@ impl Stream for DataApiPython3DataStream { } } else if let Some(fut) = &mut self.config_fut { match fut.poll_unpin(cx) { - Ready(Ok(k)) => match self.handle_config_fut_ready(k) { - Ok(()) => continue, - Err(e) => { - self.config_fut = None; + Ready(Ok(k)) => match k { + ChannelTypeConfigGen::Scylla(_) => { + let e = Error::with_msg_no_trace("scylla"); + error!("{e}"); self.data_done = true; - error!("api1_binary_events error {:?}", e); Ready(Some(Err(e))) } + ChannelTypeConfigGen::SfDatabuffer(k) => match self.handle_config_fut_ready(k) { + Ok(()) => continue, + Err(e) => { + self.config_fut = None; + self.data_done = true; + error!("api1_binary_events error {:?}", e); + Ready(Some(Err(e))) + } + }, }, Ready(Err(e)) => { self.data_done = true; @@ -900,7 +912,11 @@ impl Stream for DataApiPython3DataStream { } else { if let Some(channel) = self.channels.pop_front() { self.current_channel = Some(channel.clone()); - self.config_fut = Some(Box::pin(find_ch_conf(channel, self.node_config.clone()))); + self.config_fut = Some(Box::pin(find_ch_conf( + self.range.clone(), + channel, + self.node_config.clone(), + ))); continue; } else { self.data_done = true; diff --git a/httpret/src/channel_status.rs b/httpret/src/channel_status.rs index c75735d..2116fb5 100644 --- a/httpret/src/channel_status.rs +++ b/httpret/src/channel_status.rs @@ -12,6 +12,7 @@ use items_2::channelevents::ChannelStatusEvent; use items_2::channelevents::ConnStatusEvent; use netpod::log::*; use netpod::query::ChannelStateEventsQuery; +use netpod::ChannelTypeConfigGen; use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ACCEPT_ALL; @@ -77,7 +78,6 @@ impl ConnectionStatusEvents { let _scy = scyllaconn::create_scy_session(scyco).await?; let chconf = nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), node_config).await?; - let _series = chconf.series; let _do_one_before_range = true; let ret = Vec::new(); if true { @@ -153,17 +153,22 @@ impl ChannelStatusEvents { let chconf = nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), node_config).await?; let do_one_before_range = true; - let mut stream = scyllaconn::status::StatusStreamScylla::new( - chconf.try_series().context("channel_status")?, - q.range().clone(), - do_one_before_range, - scy, - ); - let mut ret = Vec::new(); - while let Some(item) = stream.next().await { - let item = item?; - ret.push(item); + match chconf { + ChannelTypeConfigGen::Scylla(ch_conf) => { + let mut stream = scyllaconn::status::StatusStreamScylla::new( + ch_conf.series(), + q.range().clone(), + do_one_before_range, + scy, + ); + let mut ret = Vec::new(); + while let Some(item) = stream.next().await { + let item = item?; + ret.push(item); + } + Ok(ret) + } + ChannelTypeConfigGen::SfDatabuffer(k) => todo!(), } - Ok(ret) } } diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index b00c655..fa54b32 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -12,7 +12,6 @@ use netpod::get_url_query_pairs; use netpod::log::*; use netpod::query::prebinned::PreBinnedQuery; use netpod::timeunits::*; -use netpod::ChConf; use netpod::ChannelConfigQuery; use netpod::ChannelConfigResponse; use netpod::ChannelTypeConfigGen; @@ -38,24 +37,16 @@ pub async fn chconf_from_events_v1( q: &PlainEventsQuery, ncc: &NodeConfigCached, ) -> Result { - // let ret = nodenet::channelconfig::channel_config(q.range().try_into()?, q.channel().clone(), ncc).await?; let ret = nodenet::configquorum::find_config_basics_quorum(q.channel(), ncc).await?; Ok(ret) } -pub async fn chconf_from_prebinned(q: &PreBinnedQuery, _ncc: &NodeConfigCached) -> Result { - let ret = ChConf { - backend: q.channel().backend().into(), - series: q.channel().series().clone(), - name: q.channel().name().into(), - scalar_type: q.scalar_type().clone(), - shape: q.shape().clone(), - }; +pub async fn chconf_from_prebinned(q: &PreBinnedQuery, ncc: &NodeConfigCached) -> Result { + let ret = nodenet::configquorum::find_config_basics_quorum(q.channel(), ncc).await?; Ok(ret) } pub async fn ch_conf_from_binned(q: &BinnedQuery, ncc: &NodeConfigCached) -> Result { - // let ret = nodenet::channelconfig::channel_config(q.range().try_into()?, q.channel().clone(), ncc).await?; let ret = nodenet::configquorum::find_config_basics_quorum(q.channel(), ncc).await?; Ok(ret) } @@ -103,24 +94,11 @@ impl ChannelConfigHandler { let url = Url::parse(&format!("dummy:{}", req.uri()))?; let q = ChannelConfigQuery::from_url(&url)?; info!("channel_config for q {q:?}"); - let conf = if let Some(_scyco) = &node_config.node_config.cluster.scylla { - let c = nodenet::channelconfig::channel_config(q.range.clone(), q.channel.clone(), node_config).await?; - ChannelConfigResponse { - channel: SfDbChannel::from_full(q.channel.backend(), c.series.clone(), &c.name), - scalar_type: c.scalar_type, - byte_order: None, - shape: c.shape, - } - } else if let Some(_) = &node_config.node.channel_archiver { - return Err(Error::with_msg_no_trace("channel archiver not supported")); - } else if let Some(_) = &node_config.node.archiver_appliance { - return Err(Error::with_msg_no_trace("archiver appliance not supported")); - } else { - parse::channelconfig::channel_config(&q, node_config).await? - }; + let conf = nodenet::channelconfig::channel_config(q.range.clone(), q.channel.clone(), node_config).await?; + let res: ChannelConfigResponse = conf.into(); let ret = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, APP_JSON) - .body(Body::from(serde_json::to_string(&conf)?))?; + .body(Body::from(serde_json::to_string(&res)?))?; Ok(ret) } } @@ -159,27 +137,15 @@ impl ChannelConfigsHandler { } } - async fn channel_configs( - &self, - req: Request, - node_config: &NodeConfigCached, - ) -> Result, Error> { + async fn channel_configs(&self, req: Request, ncc: &NodeConfigCached) -> Result, Error> { info!("channel_configs"); let url = Url::parse(&format!("dummy:{}", req.uri()))?; let q = ChannelConfigQuery::from_url(&url)?; info!("channel_configs for q {q:?}"); - let conf = if let Some(_) = &node_config.node_config.cluster.scylla { - return Err(Error::with_msg_no_trace("TODO")); - } else if let Some(_) = &node_config.node.channel_archiver { - return Err(Error::with_msg_no_trace("TODO")); - } else if let Some(_) = &node_config.node.archiver_appliance { - return Err(Error::with_msg_no_trace("TODO")); - } else { - disk::channelconfig::configs(q.channel, node_config).await? - }; + let ch_conf = nodenet::channelconfig::channel_config(q.range.clone(), q.channel, ncc).await?; let ret = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, APP_JSON) - .body(Body::from(serde_json::to_string(&conf)?))?; + .body(Body::from(serde_json::to_string(&ch_conf)?))?; Ok(ret) } } diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 88148a6..dc484c1 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1187,9 +1187,17 @@ impl TsNano { Self(ns) } + pub fn from_ms(ns: u64) -> Self { + Self(MS * ns) + } + pub fn ns(&self) -> u64 { self.0 } + + pub fn ms(&self) -> u64 { + self.0 / MS + } } impl fmt::Debug for TsNano { @@ -2399,15 +2407,79 @@ impl AppendToUrl for ChannelConfigQuery { } #[derive(Debug, Serialize, Deserialize)] -pub struct ChannelConfigResponse { - #[serde(rename = "channel")] - pub channel: SfDbChannel, +#[serde(rename = "SfDatabuffer")] +pub struct SfChannelConfigResponse { + #[serde(rename = "backend")] + pub backend: String, + #[serde(rename = "name")] + pub name: String, + #[serde(rename = "keyspace")] + pub keyspace: u8, + #[serde(rename = "timeBinSize")] + pub timebinsize: u64, #[serde(rename = "scalarType")] pub scalar_type: ScalarType, - #[serde(rename = "byteOrder", default, skip_serializing_if = "Option::is_none")] - pub byte_order: Option, #[serde(rename = "shape")] pub shape: Shape, + #[serde(rename = "byteOrder")] + pub byte_order: ByteOrder, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename = "Daqbuf")] +pub struct DaqbufChannelConfig { + #[serde(rename = "backend")] + pub backend: String, + #[serde(rename = "seriesId")] + pub series: u64, + #[serde(rename = "scalarType")] + pub scalar_type: ScalarType, + #[serde(rename = "shape")] + pub shape: Shape, + #[serde(rename = "name")] + pub name: String, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum ChannelConfigResponse { + SfDatabuffer(SfChannelConfigResponse), + Daqbuf(DaqbufChannelConfig), +} + +impl From for ChannelConfigResponse { + fn from(value: SfChFetchInfo) -> Self { + Self::SfDatabuffer(SfChannelConfigResponse { + backend: value.backend().into(), + name: value.name().into(), + keyspace: value.ks(), + timebinsize: value.bs().ms(), + scalar_type: value.scalar_type().clone(), + shape: value.shape().clone(), + byte_order: value.byte_order().clone(), + }) + } +} + +impl From for ChannelConfigResponse { + fn from(value: ChConf) -> Self { + Self::Daqbuf(DaqbufChannelConfig { + backend: value.backend().into(), + series: value.series(), + scalar_type: value.scalar_type().clone(), + shape: value.shape().clone(), + name: value.name().into(), + }) + } +} + +impl From for ChannelConfigResponse { + fn from(value: ChannelTypeConfigGen) -> Self { + match value { + ChannelTypeConfigGen::Scylla(k) => k.into(), + ChannelTypeConfigGen::SfDatabuffer(k) => k.into(), + } + } } /** @@ -2424,17 +2496,46 @@ pub struct ChannelInfo { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ChConf { - pub backend: String, - pub series: Option, - pub name: String, - pub scalar_type: ScalarType, - pub shape: Shape, + backend: String, + series: u64, + scalar_type: ScalarType, + shape: Shape, + name: String, } impl ChConf { - pub fn try_series(&self) -> Res2 { + pub fn new(backend: S1, series: u64, scalar_type: ScalarType, shape: Shape, name: S2) -> Self + where + S1: Into, + S2: Into, + { + Self { + backend: backend.into(), + series, + scalar_type, + shape, + name: name.into(), + } + } + + pub fn backend(&self) -> &str { + &self.backend + } + + pub fn series(&self) -> u64 { self.series - .ok_or_else(|| anyhow::anyhow!("ChConf without SeriesId {self:?}")) + } + + pub fn scalar_type(&self) -> &ScalarType { + &self.scalar_type + } + + pub fn shape(&self) -> &Shape { + &self.shape + } + + pub fn name(&self) -> &str { + &self.name } } @@ -2513,6 +2614,10 @@ impl SfChFetchInfo { pub fn shape(&self) -> &Shape { &self.shape } + + pub fn byte_order(&self) -> &ByteOrder { + &self.byte_order + } } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/nodenet/src/channelconfig.rs b/nodenet/src/channelconfig.rs index b8a5913..e8822af 100644 --- a/nodenet/src/channelconfig.rs +++ b/nodenet/src/channelconfig.rs @@ -1,99 +1,128 @@ use err::Error; use netpod::log::*; use netpod::range::evrange::NanoRange; -use netpod::ChConf; +use netpod::timeunits::DAY; +use netpod::ByteOrder; +use netpod::ChannelTypeConfigGen; use netpod::NodeConfigCached; use netpod::ScalarType; +use netpod::SfChFetchInfo; use netpod::SfDbChannel; use netpod::Shape; +use netpod::TsNano; const TEST_BACKEND: &str = "testbackend-00"; -pub async fn channel_config(range: NanoRange, channel: SfDbChannel, ncc: &NodeConfigCached) -> Result { - if channel.backend() == TEST_BACKEND { - let backend = channel.backend().into(); - // TODO the series-ids here are just random. Need to integrate with better test setup. - let ret = if channel.name() == "scalar-i32-be" { - let ret = ChConf { - backend, - series: Some(2), - name: channel.name().into(), - scalar_type: ScalarType::I32, - shape: Shape::Scalar, - }; - Ok(ret) - } else if channel.name() == "wave-f64-be-n21" { - let ret = ChConf { - backend, - series: Some(3), - name: channel.name().into(), - scalar_type: ScalarType::F64, - shape: Shape::Wave(21), - }; - Ok(ret) - } else if channel.name() == "const-regular-scalar-i32-be" { - let ret = ChConf { - backend, - series: Some(4), - name: channel.name().into(), - scalar_type: ScalarType::I32, - shape: Shape::Scalar, - }; - Ok(ret) - } else if channel.name() == "test-gen-i32-dim0-v00" { - let ret = ChConf { - backend, - series: Some(5), - name: channel.name().into(), - scalar_type: ScalarType::I32, - shape: Shape::Scalar, - }; - Ok(ret) - } else if channel.name() == "test-gen-i32-dim0-v01" { - let ret = ChConf { - backend, - series: Some(6), - name: channel.name().into(), - scalar_type: ScalarType::I32, - shape: Shape::Scalar, - }; - Ok(ret) - } else if channel.name() == "test-gen-f64-dim1-v00" { - let ret = ChConf { - backend, - series: Some(7), - name: channel.name().into(), - scalar_type: ScalarType::F64, - shape: Shape::Wave(21), - }; - Ok(ret) - } else { - error!("no test information"); - Err(Error::with_msg_no_trace(format!("no test information")) - .add_public_msg("No channel config for test channel {:?}")) - }; +fn channel_config_test_backend(channel: SfDbChannel) -> Result { + let backend = channel.backend(); + let ret = if channel.name() == "scalar-i32-be" { + let ret = SfChFetchInfo::new( + backend, + channel.name(), + 2, + TsNano(DAY), + ByteOrder::Big, + ScalarType::I32, + Shape::Scalar, + ); ret + } else if channel.name() == "wave-f64-be-n21" { + let ret = SfChFetchInfo::new( + backend, + channel.name(), + 3, + TsNano(DAY), + ByteOrder::Big, + ScalarType::F64, + Shape::Wave(21), + ); + ret + } else if channel.name() == "const-regular-scalar-i32-be" { + let ret = SfChFetchInfo::new( + backend, + channel.name(), + 2, + TsNano(DAY), + ByteOrder::Big, + ScalarType::I32, + Shape::Scalar, + ); + ret + } else if channel.name() == "test-gen-i32-dim0-v00" { + let ret = SfChFetchInfo::new( + backend, + channel.name(), + 2, + TsNano(DAY), + ByteOrder::Big, + ScalarType::I32, + Shape::Scalar, + ); + ret + } else if channel.name() == "test-gen-i32-dim0-v01" { + let ret = SfChFetchInfo::new( + backend, + channel.name(), + 2, + TsNano(DAY), + ByteOrder::Big, + ScalarType::I32, + Shape::Scalar, + ); + ret + } else if channel.name() == "test-gen-f64-dim1-v00" { + let ret = SfChFetchInfo::new( + backend, + channel.name(), + 3, + TsNano(DAY), + ByteOrder::Big, + ScalarType::F64, + Shape::Wave(21), + ); + ret + } else { + error!("no test information"); + return Err(Error::with_msg_no_trace(format!("no test information")) + .add_public_msg("No channel config for test channel {:?}")); + }; + Ok(ChannelTypeConfigGen::SfDatabuffer(ret)) +} + +pub async fn channel_config( + range: NanoRange, + channel: SfDbChannel, + ncc: &NodeConfigCached, +) -> Result { + if channel.backend() == TEST_BACKEND { + channel_config_test_backend(channel) } else if ncc.node_config.cluster.scylla.is_some() { debug!("try to get ChConf for scylla type backend"); let ret = dbconn::channelconfig::chconf_from_scylla_type_backend(&channel, ncc) .await .map_err(Error::from)?; - Ok(ret) + Ok(ChannelTypeConfigGen::Scylla(ret)) } else if ncc.node.sf_databuffer.is_some() { debug!("channel_config channel {channel:?}"); let config = disk::channelconfig::channel_config_best_match(range, channel.clone(), ncc) .await? .ok_or_else(|| Error::with_msg_no_trace("config entry not found"))?; debug!("channel_config config {config:?}"); - let ret = ChConf { - backend: config.channel.backend().into(), - series: channel.series(), - name: config.channel.name().into(), - scalar_type: config.scalar_type, - shape: config.shape, - }; + let ret = SfChFetchInfo::new( + config.channel.backend(), + config.channel.name(), + config.keyspace, + config.time_bin_size, + config.byte_order, + config.scalar_type, + config.shape, + ); + let ret = ChannelTypeConfigGen::SfDatabuffer(ret); Ok(ret) } else { - err::todoval() + return Err( + Error::with_msg_no_trace(format!("no channel config for backend {}", channel.backend())) + .add_public_msg(format!("no channel config for backend {}", channel.backend())), + ); } } diff --git a/nodenet/src/configquorum.rs b/nodenet/src/configquorum.rs index 1b72ac4..72397d1 100644 --- a/nodenet/src/configquorum.rs +++ b/nodenet/src/configquorum.rs @@ -5,7 +5,7 @@ use netpod::NodeConfigCached; use netpod::SfChFetchInfo; use netpod::SfDbChannel; -async fn find_sf_channel_config_basics_quorum() -> Result { +async fn find_sf_ch_config_quorum() -> Result { type _A = SfDbChannel; type _B = SfDbChConf; // TODO create new endpoint which only returns the most matching config entry @@ -17,10 +17,10 @@ pub async fn find_config_basics_quorum( channel: &SfDbChannel, ncc: &NodeConfigCached, ) -> Result { - if let Some(cfg) = &ncc.node.sf_databuffer { - let ret: SfChFetchInfo = err::todoval(); + if let Some(_cfg) = &ncc.node.sf_databuffer { + let ret: SfChFetchInfo = find_sf_ch_config_quorum().await?; Ok(ChannelTypeConfigGen::SfDatabuffer(ret)) - } else if let Some(cfg) = &ncc.node_config.cluster.scylla { + } else if let Some(_cfg) = &ncc.node_config.cluster.scylla { let ret = dbconn::channelconfig::chconf_from_scylla_type_backend(&channel, ncc) .await .map_err(Error::from)?; diff --git a/nodenet/src/scylla.rs b/nodenet/src/scylla.rs index ed2b313..0da5b9e 100644 --- a/nodenet/src/scylla.rs +++ b/nodenet/src/scylla.rs @@ -1,12 +1,9 @@ -use err::anyhow::Context; use err::Error; use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; -use items_0::Appendable; -use items_0::Empty; use items_2::channelevents::ChannelEvents; use netpod::log::*; use netpod::ChConf; @@ -19,19 +16,16 @@ pub async fn scylla_channel_event_stream( evq: PlainEventsQuery, chconf: ChConf, scyco: &ScyllaConfig, - node_config: &NodeConfigCached, + _ncc: &NodeConfigCached, ) -> Result> + Send>>, Error> { // TODO depends in general on the query // TODO why both in PlainEventsQuery and as separate parameter? Check other usages. let do_one_before_range = false; // TODO use better builder pattern with shortcuts for production and dev defaults - let f = crate::channelconfig::channel_config(evq.range().try_into()?, evq.channel().clone(), node_config) - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let scy = scyllaconn::create_scy_session(scyco).await?; - let series = f.try_series().context("scylla_channel_event_stream")?; - let scalar_type = f.scalar_type; - let shape = f.shape; + let series = chconf.series(); + let scalar_type = chconf.scalar_type(); + let shape = chconf.shape(); let do_test_stream_error = false; let with_values = evq.need_value_data(); debug!("Make EventsStreamScylla for {series:?} {scalar_type:?} {shape:?}"); @@ -39,8 +33,8 @@ pub async fn scylla_channel_event_stream( series, evq.range().into(), do_one_before_range, - scalar_type, - shape, + scalar_type.clone(), + shape.clone(), with_values, scy, do_test_stream_error, diff --git a/parse/src/channelconfig.rs b/parse/src/channelconfig.rs index 11d293d..bc0ffd2 100644 --- a/parse/src/channelconfig.rs +++ b/parse/src/channelconfig.rs @@ -308,23 +308,6 @@ pub fn parse_config(inp: &[u8]) -> NRes { Ok((inp, ret)) } -pub async fn channel_config(q: &ChannelConfigQuery, ncc: &NodeConfigCached) -> Result { - let conf = read_local_config(q.channel.clone(), ncc.clone()).await?; - let entry_res = extract_matching_config_entry(&q.range, &conf)?; - let entry = match entry_res { - MatchingConfigEntry::None => return Err(Error::with_public_msg("no config entry found")), - MatchingConfigEntry::Single(entry) => entry, - MatchingConfigEntry::Multiple(_) => return Err(Error::with_public_msg("multiple config entries found")), - }; - let ret = ChannelConfigResponse { - channel: q.channel.clone(), - scalar_type: entry.scalar_type.clone(), - byte_order: Some(entry.byte_order.clone()), - shape: entry.to_shape()?, - }; - Ok(ret) -} - async fn read_local_config_real(channel: SfDbChannel, ncc: &NodeConfigCached) -> Result { let path = ncc .node diff --git a/scyllaconn/src/config.rs b/scyllaconn/src/config.rs deleted file mode 100644 index ccaaea5..0000000 --- a/scyllaconn/src/config.rs +++ /dev/null @@ -1,50 +0,0 @@ -use crate::errconv::ErrConv; -use err::Error; -use futures_util::StreamExt; -use netpod::log::*; -use netpod::ChannelConfigQuery; -use netpod::ChannelConfigResponse; -use netpod::ScalarType; -use netpod::SfDbChannel; -use netpod::Shape; -use scylla::Session as ScySession; -use std::sync::Arc; - -// TODO unused, table in postgres. -pub async fn config_from_scylla(chq: ChannelConfigQuery, scy: Arc) -> Result { - let cql = "select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?"; - let mut it = scy - .query_iter(cql, (chq.channel.backend(), chq.channel.name())) - .await - .err_conv()?; - let mut rows = Vec::new(); - while let Some(row) = it.next().await { - let row = row.err_conv()?; - let cols = row.into_typed::<(i64, i32, Vec)>().err_conv()?; - let scalar_type = ScalarType::from_scylla_i32(cols.1)?; - let shape = Shape::from_scylla_shape_dims(&cols.2)?; - let channel = SfDbChannel::from_full(chq.channel.backend(), Some(cols.0 as _), chq.channel.name()); - let res = ChannelConfigResponse { - channel, - scalar_type, - byte_order: None, - shape, - }; - info!("config_from_scylla: {res:?}"); - rows.push(res); - } - if rows.is_empty() { - return Err(Error::with_public_msg_no_trace(format!( - "can not find config for channel {:?}", - chq.channel - ))); - } else { - if rows.len() > 1 { - error!( - "Found multiple configurations for channel {:?} {:?}", - chq.channel, rows - ); - } - Ok(rows.pop().unwrap()) - } -} diff --git a/scyllaconn/src/scyllaconn.rs b/scyllaconn/src/scyllaconn.rs index c235355..5a9882a 100644 --- a/scyllaconn/src/scyllaconn.rs +++ b/scyllaconn/src/scyllaconn.rs @@ -1,5 +1,4 @@ pub mod bincache; -pub mod config; pub mod errconv; pub mod events; pub mod status;