From 5f1b8d6a36e4d47c34a8645fe7eb2cda1d30355f Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 27 May 2021 20:08:33 +0200 Subject: [PATCH] Reproduce multiple RangeComplete, improve cache write --- disk/src/binned.rs | 3 - disk/src/cache.rs | 36 ++++++------ retrieval/src/test.rs | 126 +++++++++++++++++++++++++++--------------- 3 files changed, 101 insertions(+), 64 deletions(-) diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 2333299..492360a 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -426,9 +426,6 @@ impl ReadableFromFile for MinMaxAvgScalarBinBatch { Ok(ReadPbv::new(file)) } fn from_buf(buf: &[u8]) -> Result { - let mut h = crc32fast::Hasher::new(); - h.update(&buf); - info!("try to deserialize from buf len {} crc {}", buf.len(), h.finalize()); let dec: MinMaxAvgScalarBinBatch = serde_cbor::from_slice(&buf)?; Ok(dec) } diff --git a/disk/src/cache.rs b/disk/src/cache.rs index ca4ea2d..88f62ed 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -602,34 +602,38 @@ where }; let path = cfd.path(&node_config); let enc = serde_cbor::to_vec(&values)?; - let mut h = crc32fast::Hasher::new(); - h.update(&enc); - info!( - "Writing cache file len {} crc {}\n{:?}\npath: {:?}", - enc.len(), - h.finalize(), - cfd, - path - ); tokio::fs::create_dir_all(path.parent().unwrap()).await?; + let now = Utc::now(); + let mut h = crc32fast::Hasher::new(); + h.update(&now.timestamp_nanos().to_le_bytes()); + let r = h.finalize(); + let tmp_path = + path.parent() + .unwrap() + .join(format!("{}.tmp.{:08x}", path.file_name().unwrap().to_str().unwrap(), r)); let res = tokio::task::spawn_blocking({ - let path = path.clone(); + let tmp_path = tmp_path.clone(); move || { use fs2::FileExt; use io::Write; - // TODO write to random tmp file first and then move into place. + info!("try to write tmp at {:?}", tmp_path); let mut f = std::fs::OpenOptions::new() - .create(true) - .truncate(true) + .create_new(true) .write(true) - .open(&path)?; - f.lock_exclusive()?; + .open(&tmp_path)?; + if false { + f.lock_exclusive()?; + } f.write_all(&enc)?; - f.unlock()?; + if false { + f.unlock()?; + } + f.flush()?; Ok::<_, Error>(enc.len()) } }) .await??; + tokio::fs::rename(&tmp_path, &path).await?; let ret = WrittenPbCache { bytes: res as u64 }; Ok(ret) } diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index c809b8f..a1f100e 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -2,7 +2,7 @@ use crate::spawn_test_hosts; use bytes::BytesMut; use chrono::{DateTime, Utc}; use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; -use disk::agg::streams::StreamItem; +use disk::agg::streams::{Bins, StatsItem, StreamItem}; use disk::binned::RangeCompletableItem; use disk::cache::BinnedQuery; use disk::frame::inmem::InMemoryFrameAsyncReadStream; @@ -52,33 +52,36 @@ fn get_binned_binary() { async fn get_binned_binary_inner() -> Result<(), Error> { let cluster = test_cluster(); let _hosts = spawn_test_hosts(cluster.clone()); - get_binned_channel( - "wave-f64-be-n21", - "1970-01-01T00:20:10.000Z", - "1970-01-01T00:20:30.000Z", - 2, - &cluster, - ) - .await?; - if true { - return Ok(()); + if false { + get_binned_channel( + "wave-f64-be-n21", + "1970-01-01T00:20:10.000Z", + "1970-01-01T00:20:30.000Z", + 2, + &cluster, + ) + .await?; + } + if false { + get_binned_channel( + "wave-u16-le-n77", + "1970-01-01T01:11:00.000Z", + "1970-01-01T01:40:00.000Z", + 7, + &cluster, + ) + .await?; + } + if true { + get_binned_channel( + "wave-u16-le-n77", + "1970-01-01T01:42:00.000Z", + "1970-01-01T03:55:00.000Z", + 2, + &cluster, + ) + .await?; } - get_binned_channel( - "wave-u16-le-n77", - "1970-01-01T01:11:00.000Z", - "1970-01-01T01:40:00.000Z", - 7, - &cluster, - ) - .await?; - get_binned_channel( - "wave-u16-le-n77", - "1970-01-01T01:42:00.000Z", - "1970-01-01T03:55:00.000Z", - 2, - &cluster, - ) - .await?; Ok(()) } @@ -124,20 +127,43 @@ where let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; info!("get_cached_0 DONE bin_count {} time {} ms", res.bin_count, ms); - Ok(()) + if !res.is_valid() { + error!("result is not valid:\n{:?}", res); + Err(Error::with_msg("result not valid")) + } else { + Ok(()) + } } #[derive(Debug)] pub struct BinnedResponse { - bin_count: usize, + bin_count: u64, + err_item_count: u64, + data_item_count: u64, bytes_read: u64, + range_complete_count: u64, + log_item_count: u64, + stats_item_count: u64, } impl BinnedResponse { pub fn new() -> Self { Self { bin_count: 0, + err_item_count: 0, + data_item_count: 0, bytes_read: 0, + range_complete_count: 0, + log_item_count: 0, + stats_item_count: 0, + } + } + + pub fn is_valid(&self) -> bool { + if self.range_complete_count > 1 { + false + } else { + true } } } @@ -189,29 +215,39 @@ where }; ready(g) }) - .fold(Ok(BinnedResponse::new()), |a, k| { - let g = match a { - Ok(a) => match k { - Ok(StreamItem::DataItem(_item)) => { - // TODO extract bin count from item - //a.bin_count += k.ts1s.len(); - Ok(a) + .fold(BinnedResponse::new(), |mut a, k| { + let g = match k { + Ok(StreamItem::Log(_item)) => { + a.log_item_count += 1; + a + } + Ok(StreamItem::Stats(item)) => match item { + StatsItem::EventDataReadStats(item) => { + a.bytes_read += item.parsed_bytes; + a } - Ok(StreamItem::Stats(_item)) => { - // TODO adapt to new Stats type: - //a.bytes_read += stats.parsed_bytes; - Ok(a) - } - Ok(_) => Ok(a), - Err(e) => Err(e), }, - Err(e) => Err(e), + Ok(StreamItem::DataItem(item)) => match item { + RangeCompletableItem::RangeComplete => { + a.range_complete_count += 1; + a + } + RangeCompletableItem::Data(item) => { + a.data_item_count += 1; + a.bin_count += item.bin_count() as u64; + a + } + }, + Err(_e) => { + a.err_item_count += 1; + a + } }; ready(g) }); let ret = s1.await; info!("BinnedResponse: {:?}", ret); - ret + Ok(ret) } #[test]