Check for requested backend

This commit is contained in:
Dominik Werder
2021-05-06 12:17:25 +02:00
parent d3e92d85f0
commit 2f4d2ccea9
7 changed files with 69 additions and 31 deletions

81
disk/src/cache/pbv.rs vendored
View File

@@ -8,12 +8,12 @@ use crate::streamlog::Streamlog;
use bytes::Bytes;
use err::Error;
use futures_core::Stream;
use futures_util::pin_mut;
use futures_util::{FutureExt, StreamExt};
use netpod::log::*;
use netpod::streamext::SCC;
use netpod::{BinnedRange, NodeConfigCached, PreBinnedPatchIterator, PreBinnedPatchRange};
use std::future::{ready, Future};
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -53,6 +53,8 @@ pub struct PreBinnedValueStream {
node_config: NodeConfigCached,
open_check_local_file: Option<Pin<Box<dyn Future<Output = Result<tokio::fs::File, std::io::Error>> + Send>>>,
fut2: Option<Pin<Box<dyn Stream<Item = Result<PreBinnedItem, Error>> + Send>>>,
read_from_cache: bool,
cache_written: bool,
data_complete: bool,
range_complete_observed: bool,
range_complete_emitted: bool,
@@ -71,6 +73,8 @@ impl PreBinnedValueStream {
node_config: node_config.clone(),
open_check_local_file: None,
fut2: None,
read_from_cache: false,
cache_written: false,
data_complete: false,
range_complete_observed: false,
range_complete_emitted: false,
@@ -196,9 +200,9 @@ impl Stream for PreBinnedValueStream {
} else if let Some(item) = self.streamlog.pop() {
Ready(Some(Ok(PreBinnedItem::Log(item))))
} else if let Some(fut) = &mut self.write_fut {
pin_mut!(fut);
match fut.poll(cx) {
match fut.poll_unpin(cx) {
Ready(item) => {
self.cache_written = true;
self.write_fut = None;
match item {
Ok(()) => {
@@ -220,6 +224,7 @@ impl Stream for PreBinnedValueStream {
match item {
Ok(item) => {
self.data_complete = true;
self.range_complete_observed = true;
Ready(Some(Ok(item)))
}
Err(e) => {
@@ -230,33 +235,46 @@ impl Stream for PreBinnedValueStream {
}
Pending => Pending,
}
} else if self.range_complete_emitted {
self.completed = true;
Ready(None)
} else if self.data_complete {
if self.range_complete_observed {
if self.range_complete_emitted {
self.completed = true;
Ready(None)
} else {
let msg = format!(
"Write cache file\n{:?}\nN: {}\n\n\n",
self.query.patch,
self.values.ts1s.len()
);
self.streamlog.append(Level::INFO, msg);
let values = std::mem::replace(&mut self.values, MinMaxAvgScalarBinBatch::empty());
let fut = super::write_pb_cache_min_max_avg_scalar(
values,
self.query.patch.clone(),
self.query.agg_kind.clone(),
self.query.channel.clone(),
self.node_config.clone(),
);
self.write_fut = Some(Box::pin(fut));
if self.cache_written {
if self.range_complete_observed {
self.range_complete_emitted = true;
Ready(Some(Ok(PreBinnedItem::RangeComplete)))
} else {
self.completed = true;
Ready(None)
}
} else if self.read_from_cache {
self.cache_written = true;
continue 'outer;
} else {
self.completed = true;
Ready(None)
match self.query.cache_usage {
super::CacheUsage::Use | super::CacheUsage::Recreate => {
let msg = format!(
"Write cache file\n{:?}\nN: {}\n\n\n",
self.query.patch,
self.values.ts1s.len()
);
self.streamlog.append(Level::INFO, msg);
let values = std::mem::replace(&mut self.values, MinMaxAvgScalarBinBatch::empty());
let fut = super::write_pb_cache_min_max_avg_scalar(
values,
self.query.patch.clone(),
self.query.agg_kind.clone(),
self.query.channel.clone(),
self.node_config.clone(),
);
self.write_fut = Some(Box::pin(fut));
continue 'outer;
}
_ => {
self.cache_written = true;
continue 'outer;
}
}
}
} else if let Some(fut) = self.fut2.as_mut() {
match fut.poll_next_unpin(cx) {
@@ -295,6 +313,7 @@ impl Stream for PreBinnedValueStream {
self.open_check_local_file = None;
match item {
Ok(file) => {
self.read_from_cache = true;
let fut = super::read_pbv(file);
self.read_cache_fut = Some(Box::pin(fut));
continue 'outer;
@@ -323,17 +342,17 @@ impl Stream for PreBinnedValueStream {
Pending => Pending,
}
} else {
#[allow(unused_imports)]
use std::os::unix::fs::OpenOptionsExt;
let mut opts = std::fs::OpenOptions::new();
opts.read(true);
let cfd = CacheFileDesc {
channel: self.query.channel.clone(),
patch: self.query.patch.clone(),
agg_kind: self.query.agg_kind.clone(),
};
let path = cfd.path(&self.node_config);
let fut = async { tokio::fs::OpenOptions::from(opts).open(path).await };
use super::CacheUsage;
let path = match self.query.cache_usage {
CacheUsage::Use => cfd.path(&self.node_config),
_ => PathBuf::from("DOESNOTEXIST"),
};
let fut = async { tokio::fs::OpenOptions::new().read(true).open(path).await };
self.open_check_local_file = Some(Box::pin(fut));
continue 'outer;
};