From 2f4d2ccea9159efab5f2575358b43351b6614470 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 6 May 2021 12:17:25 +0200 Subject: [PATCH] Check for requested backend --- disk/src/aggtest.rs | 1 + disk/src/cache.rs | 14 ++++++ disk/src/cache/pbv.rs | 81 +++++++++++++++++++++------------- disk/src/gen.rs | 1 + netpod/src/lib.rs | 1 + retrieval/src/bin/retrieval.rs | 1 + retrieval/src/test.rs | 1 + 7 files changed, 69 insertions(+), 31 deletions(-) diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index d29fd0e..027e161 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -18,6 +18,7 @@ pub fn make_test_node(id: u32) -> Node { data_base_path: format!("../tmpdata/node{:02}", id).into(), split: id, ksprefix: "ks".into(), + backend: "testbackend".into(), } } diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 501f6e3..6f31ed2 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -158,6 +158,13 @@ pub async fn binned_bytes_for_http( node_config: &NodeConfigCached, query: &BinnedQuery, ) -> Result { + if query.channel.backend != node_config.node.backend { + let err = Error::with_msg(format!( + "backend mismatch we {} requested {}", + node_config.node.backend, query.channel.backend + )); + return Err(err); + } let range = &query.range; let channel_config = read_local_config(&query.channel, &node_config.node).await?; let entry = extract_matching_config_entry(range, &channel_config); @@ -249,6 +256,13 @@ pub fn pre_binned_bytes_for_http( query: &PreBinnedQuery, ) -> Result { info!("pre_binned_bytes_for_http {:?} {:?}", query, node_config.node); + if query.channel.backend != node_config.node.backend { + let err = Error::with_msg(format!( + "backend mismatch we {} requested {}", + node_config.node.backend, query.channel.backend + )); + return Err(err); + } let patch_node_ix = node_ix_for_patch(&query.patch, &query.channel, &node_config.node_config.cluster); if node_config.ix as u32 != patch_node_ix { Err(Error::with_msg(format!( diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index 906f7b4..26d8df9 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -8,12 +8,12 @@ use crate::streamlog::Streamlog; use bytes::Bytes; use err::Error; use futures_core::Stream; -use futures_util::pin_mut; use futures_util::{FutureExt, StreamExt}; use netpod::log::*; use netpod::streamext::SCC; use netpod::{BinnedRange, NodeConfigCached, PreBinnedPatchIterator, PreBinnedPatchRange}; use std::future::{ready, Future}; +use std::path::PathBuf; use std::pin::Pin; use std::task::{Context, Poll}; @@ -53,6 +53,8 @@ pub struct PreBinnedValueStream { node_config: NodeConfigCached, open_check_local_file: Option> + Send>>>, fut2: Option> + Send>>>, + read_from_cache: bool, + cache_written: bool, data_complete: bool, range_complete_observed: bool, range_complete_emitted: bool, @@ -71,6 +73,8 @@ impl PreBinnedValueStream { node_config: node_config.clone(), open_check_local_file: None, fut2: None, + read_from_cache: false, + cache_written: false, data_complete: false, range_complete_observed: false, range_complete_emitted: false, @@ -196,9 +200,9 @@ impl Stream for PreBinnedValueStream { } else if let Some(item) = self.streamlog.pop() { Ready(Some(Ok(PreBinnedItem::Log(item)))) } else if let Some(fut) = &mut self.write_fut { - pin_mut!(fut); - match fut.poll(cx) { + match fut.poll_unpin(cx) { Ready(item) => { + self.cache_written = true; self.write_fut = None; match item { Ok(()) => { @@ -220,6 +224,7 @@ impl Stream for PreBinnedValueStream { match item { Ok(item) => { self.data_complete = true; + self.range_complete_observed = true; Ready(Some(Ok(item))) } Err(e) => { @@ -230,33 +235,46 @@ impl Stream for PreBinnedValueStream { } Pending => Pending, } + } else if self.range_complete_emitted { + self.completed = true; + Ready(None) } else if self.data_complete { - if self.range_complete_observed { - if self.range_complete_emitted { - self.completed = true; - Ready(None) - } else { - let msg = format!( - "Write cache file\n{:?}\nN: {}\n\n\n", - self.query.patch, - self.values.ts1s.len() - ); - self.streamlog.append(Level::INFO, msg); - let values = std::mem::replace(&mut self.values, MinMaxAvgScalarBinBatch::empty()); - let fut = super::write_pb_cache_min_max_avg_scalar( - values, - self.query.patch.clone(), - self.query.agg_kind.clone(), - self.query.channel.clone(), - self.node_config.clone(), - ); - self.write_fut = Some(Box::pin(fut)); + if self.cache_written { + if self.range_complete_observed { self.range_complete_emitted = true; Ready(Some(Ok(PreBinnedItem::RangeComplete))) + } else { + self.completed = true; + Ready(None) } + } else if self.read_from_cache { + self.cache_written = true; + continue 'outer; } else { - self.completed = true; - Ready(None) + match self.query.cache_usage { + super::CacheUsage::Use | super::CacheUsage::Recreate => { + let msg = format!( + "Write cache file\n{:?}\nN: {}\n\n\n", + self.query.patch, + self.values.ts1s.len() + ); + self.streamlog.append(Level::INFO, msg); + let values = std::mem::replace(&mut self.values, MinMaxAvgScalarBinBatch::empty()); + let fut = super::write_pb_cache_min_max_avg_scalar( + values, + self.query.patch.clone(), + self.query.agg_kind.clone(), + self.query.channel.clone(), + self.node_config.clone(), + ); + self.write_fut = Some(Box::pin(fut)); + continue 'outer; + } + _ => { + self.cache_written = true; + continue 'outer; + } + } } } else if let Some(fut) = self.fut2.as_mut() { match fut.poll_next_unpin(cx) { @@ -295,6 +313,7 @@ impl Stream for PreBinnedValueStream { self.open_check_local_file = None; match item { Ok(file) => { + self.read_from_cache = true; let fut = super::read_pbv(file); self.read_cache_fut = Some(Box::pin(fut)); continue 'outer; @@ -323,17 +342,17 @@ impl Stream for PreBinnedValueStream { Pending => Pending, } } else { - #[allow(unused_imports)] - use std::os::unix::fs::OpenOptionsExt; - let mut opts = std::fs::OpenOptions::new(); - opts.read(true); let cfd = CacheFileDesc { channel: self.query.channel.clone(), patch: self.query.patch.clone(), agg_kind: self.query.agg_kind.clone(), }; - let path = cfd.path(&self.node_config); - let fut = async { tokio::fs::OpenOptions::from(opts).open(path).await }; + use super::CacheUsage; + let path = match self.query.cache_usage { + CacheUsage::Use => cfd.path(&self.node_config), + _ => PathBuf::from("DOESNOTEXIST"), + }; + let fut = async { tokio::fs::OpenOptions::new().read(true).open(path).await }; self.open_check_local_file = Some(Box::pin(fut)); continue 'outer; }; diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 3f22a4a..53e5152 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -62,6 +62,7 @@ pub async fn gen_test_data() -> Result<(), Error> { split: i1, data_base_path: data_base_path.join(format!("node{:02}", i1)), ksprefix: ksprefix.clone(), + backend: "testbackend".into(), }; ensemble.nodes.push(node); } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 249e0a8..37f4154 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -105,6 +105,7 @@ pub struct Node { pub split: u32, pub data_base_path: PathBuf, pub ksprefix: String, + pub backend: String, } #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/retrieval/src/bin/retrieval.rs b/retrieval/src/bin/retrieval.rs index 8772be9..bfbd2aa 100644 --- a/retrieval/src/bin/retrieval.rs +++ b/retrieval/src/bin/retrieval.rs @@ -60,6 +60,7 @@ fn simple_fetch() { data_base_path: err::todoval(), ksprefix: "daq_swissfel".into(), split: 0, + backend: "testbackend".into(), }; let query = netpod::AggQuerySingleChannel { channel_config: ChannelConfig { diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index 3c63f5e..cecabcf 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -24,6 +24,7 @@ fn test_cluster() -> Cluster { data_base_path: format!("../tmpdata/node{:02}", id).into(), ksprefix: "ks".into(), split: id, + backend: "testbackend".into(), }) .collect(); Cluster {