From d3e92d85f033e11e56da215b6ee216160b80f54a Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 6 May 2021 11:34:36 +0200 Subject: [PATCH] Seems like I can write and read cached data --- disk/src/agg/binnedt.rs | 19 +++++---- disk/src/cache.rs | 38 +++++++++++++---- disk/src/cache/pbv.rs | 91 ++++++++++++++++++++++++++++------------- retrieval/src/test.rs | 2 +- 4 files changed, 103 insertions(+), 47 deletions(-) diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 9a37137..ed0a5aa 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -129,6 +129,9 @@ where return Ready(None); } } + } else { + self.completed = true; + return Ready(None); } } let cur = if let Some(k) = self.left.take() { @@ -163,7 +166,6 @@ where //info!("ENDS BEFORE"); continue 'outer; } else if ag.starts_after(&k) { - //info!("STARTS AFTER"); self.left = Some(Ready(Some(Ok(k)))); self.curbin += 1; let range = self.spec.get_range(self.curbin); @@ -172,17 +174,16 @@ where .replace(I::aggregator_new_static(range.beg, range.end)) .unwrap() .result(); - //Ready(Some(Ok(ret))) self.tmp_agg_results = ret.into(); + if self.curbin as u64 >= self.spec.count { + self.data_completed = true; + } continue 'outer; } else { - //info!("INGEST"); let mut k = k; ag.ingest(&mut k); - // if this input contains also data after the current bin, then I need to keep - // it for the next round. + let k = k; if ag.ends_after(&k) { - //info!("ENDS AFTER"); self.left = Some(Ready(Some(Ok(k)))); self.curbin += 1; let range = self.spec.get_range(self.curbin); @@ -191,11 +192,12 @@ where .replace(I::aggregator_new_static(range.beg, range.end)) .unwrap() .result(); - //Ready(Some(Ok(ret))) self.tmp_agg_results = ret.into(); + if self.curbin as u64 >= self.spec.count { + self.data_completed = true; + } continue 'outer; } else { - //info!("ENDS WITHIN"); continue 'outer; } } @@ -208,7 +210,6 @@ where Ready(None) => { self.inp_completed = true; if self.curbin as u64 >= self.spec.count { - warn!("IntoBinnedTDefaultStream curbin out of spec, END"); self.data_completed = true; continue 'outer; } else { diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 0d5883c..501f6e3 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -2,6 +2,7 @@ use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::binnedstream::BinnedStream; use crate::cache::pbv::PreBinnedValueByteStream; +use crate::cache::pbvfs::PreBinnedItem; use crate::channelconfig::{extract_matching_config_entry, read_local_config}; use crate::frame::makeframe::make_frame; use crate::merge::MergedMinMaxAvgScalarStream; @@ -23,8 +24,8 @@ use std::path::PathBuf; use std::pin::Pin; use std::task::{Context, Poll}; use tiny_keccak::Hasher; -use tokio::fs::OpenOptions; -use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; +use tokio::fs::File; +use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -512,12 +513,31 @@ pub async fn write_pb_cache_min_max_avg_scalar( let enc = serde_cbor::to_vec(&values)?; info!("Encoded size: {}", enc.len()); tokio::fs::create_dir_all(path.parent().unwrap()).await?; - let mut f = OpenOptions::new() - .truncate(true) - .create(true) - .write(true) - .open(&path) - .await?; - f.write_all(&enc).await?; + tokio::task::spawn_blocking({ + let path = path.clone(); + move || { + use fs2::FileExt; + use std::io::Write; + let mut f = std::fs::OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(&path)?; + f.lock_exclusive()?; + f.write_all(&enc)?; + f.unlock()?; + Ok::<_, Error>(()) + } + }) + .await??; Ok(()) } + +pub async fn read_pbv(mut file: File) -> Result { + let mut buf = vec![]; + file.read_to_end(&mut buf).await?; + info!("Read cached file len {}", buf.len()); + let dec: MinMaxAvgScalarBinBatch = serde_cbor::from_slice(&buf)?; + info!("Decoded cached file"); + Ok(PreBinnedItem::Batch(dec)) +} diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index 875511f..906f7b4 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -1,7 +1,7 @@ use crate::agg::binnedt::IntoBinnedT; use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem}; use crate::cache::pbvfs::{PreBinnedItem, PreBinnedValueFetchedStream}; -use crate::cache::{MergedFromRemotes, PreBinnedQuery}; +use crate::cache::{CacheFileDesc, MergedFromRemotes, PreBinnedQuery}; use crate::frame::makeframe::make_frame; use crate::raw::EventsQuery; use crate::streamlog::Streamlog; @@ -61,6 +61,7 @@ pub struct PreBinnedValueStream { streamlog: Streamlog, values: MinMaxAvgScalarBinBatch, write_fut: Option> + Send>>>, + read_cache_fut: Option> + Send>>>, } impl PreBinnedValueStream { @@ -78,6 +79,7 @@ impl PreBinnedValueStream { streamlog: Streamlog::new(node_config.ix as u32), values: MinMaxAvgScalarBinBatch::empty(), write_fut: None, + read_cache_fut: None, } } @@ -196,14 +198,35 @@ impl Stream for PreBinnedValueStream { } else if let Some(fut) = &mut self.write_fut { pin_mut!(fut); match fut.poll(cx) { - Ready(Ok(())) => { + Ready(item) => { self.write_fut = None; - self.streamlog.append(Level::INFO, format!("cache file written")); - continue 'outer; + match item { + Ok(()) => { + self.streamlog.append(Level::INFO, format!("cache file written")); + continue 'outer; + } + Err(e) => { + self.errored = true; + Ready(Some(Err(e))) + } + } } - Ready(Err(e)) => { - self.errored = true; - Ready(Some(Err(e))) + Pending => Pending, + } + } else if let Some(fut) = &mut self.read_cache_fut { + match fut.poll_unpin(cx) { + Ready(item) => { + self.read_cache_fut = None; + match item { + Ok(item) => { + self.data_complete = true; + Ready(Some(Ok(item))) + } + Err(e) => { + self.errored = true; + Ready(Some(Err(e))) + } + } } Pending => Pending, } @@ -268,29 +291,35 @@ impl Stream for PreBinnedValueStream { } } else if let Some(fut) = self.open_check_local_file.as_mut() { match fut.poll_unpin(cx) { - Ready(Ok(_file)) => { - let e = Err(Error::with_msg(format!("TODO use the cached data from file"))); - self.errored = true; - Ready(Some(e)) - } - Ready(Err(e)) => match e.kind() { - std::io::ErrorKind::NotFound => { - error!("TODO LOCAL CACHE FILE NOT FOUND"); - self.try_setup_fetch_prebinned_higher_res(); - if self.fut2.is_none() { - let e = Err(Error::with_msg(format!("try_setup_fetch_prebinned_higher_res failed"))); - self.errored = true; - Ready(Some(e)) - } else { + Ready(item) => { + self.open_check_local_file = None; + match item { + Ok(file) => { + let fut = super::read_pbv(file); + self.read_cache_fut = Some(Box::pin(fut)); continue 'outer; } + Err(e) => match e.kind() { + std::io::ErrorKind::NotFound => { + self.try_setup_fetch_prebinned_higher_res(); + if self.fut2.is_none() { + let e = Err(Error::with_msg(format!( + "try_setup_fetch_prebinned_higher_res failed" + ))); + self.errored = true; + Ready(Some(e)) + } else { + continue 'outer; + } + } + _ => { + error!("File I/O error: {:?}", e); + self.errored = true; + Ready(Some(Err(e.into()))) + } + }, } - _ => { - error!("File I/O error: {:?}", e); - self.errored = true; - Ready(Some(Err(e.into()))) - } - }, + } Pending => Pending, } } else { @@ -298,7 +327,13 @@ impl Stream for PreBinnedValueStream { use std::os::unix::fs::OpenOptionsExt; let mut opts = std::fs::OpenOptions::new(); opts.read(true); - let fut = async { tokio::fs::OpenOptions::from(opts).open("/DOESNOTEXIST").await }; + let cfd = CacheFileDesc { + channel: self.query.channel.clone(), + patch: self.query.patch.clone(), + agg_kind: self.query.agg_kind.clone(), + }; + let path = cfd.path(&self.node_config); + let fut = async { tokio::fs::OpenOptions::from(opts).open(path).await }; self.open_check_local_file = Some(Box::pin(fut)); continue 'outer; }; diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index f6b30f7..3c63f5e 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -151,7 +151,7 @@ where Some(Ok(MinMaxAvgScalarBinBatchStreamItem::Log(item))) } item => { - info!("TEST GOT ITEM {:?}\n", item); + info!("TEST GOT ITEM {:?}", item); Some(Ok(item)) } },