Reproduce multiple RangeComplete, improve cache write
This commit is contained in:
@@ -426,9 +426,6 @@ impl ReadableFromFile for MinMaxAvgScalarBinBatch {
|
||||
Ok(ReadPbv::new(file))
|
||||
}
|
||||
fn from_buf(buf: &[u8]) -> Result<Self, Error> {
|
||||
let mut h = crc32fast::Hasher::new();
|
||||
h.update(&buf);
|
||||
info!("try to deserialize from buf len {} crc {}", buf.len(), h.finalize());
|
||||
let dec: MinMaxAvgScalarBinBatch = serde_cbor::from_slice(&buf)?;
|
||||
Ok(dec)
|
||||
}
|
||||
|
||||
@@ -602,34 +602,38 @@ where
|
||||
};
|
||||
let path = cfd.path(&node_config);
|
||||
let enc = serde_cbor::to_vec(&values)?;
|
||||
let mut h = crc32fast::Hasher::new();
|
||||
h.update(&enc);
|
||||
info!(
|
||||
"Writing cache file len {} crc {}\n{:?}\npath: {:?}",
|
||||
enc.len(),
|
||||
h.finalize(),
|
||||
cfd,
|
||||
path
|
||||
);
|
||||
tokio::fs::create_dir_all(path.parent().unwrap()).await?;
|
||||
let now = Utc::now();
|
||||
let mut h = crc32fast::Hasher::new();
|
||||
h.update(&now.timestamp_nanos().to_le_bytes());
|
||||
let r = h.finalize();
|
||||
let tmp_path =
|
||||
path.parent()
|
||||
.unwrap()
|
||||
.join(format!("{}.tmp.{:08x}", path.file_name().unwrap().to_str().unwrap(), r));
|
||||
let res = tokio::task::spawn_blocking({
|
||||
let path = path.clone();
|
||||
let tmp_path = tmp_path.clone();
|
||||
move || {
|
||||
use fs2::FileExt;
|
||||
use io::Write;
|
||||
// TODO write to random tmp file first and then move into place.
|
||||
info!("try to write tmp at {:?}", tmp_path);
|
||||
let mut f = std::fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.create_new(true)
|
||||
.write(true)
|
||||
.open(&path)?;
|
||||
f.lock_exclusive()?;
|
||||
.open(&tmp_path)?;
|
||||
if false {
|
||||
f.lock_exclusive()?;
|
||||
}
|
||||
f.write_all(&enc)?;
|
||||
f.unlock()?;
|
||||
if false {
|
||||
f.unlock()?;
|
||||
}
|
||||
f.flush()?;
|
||||
Ok::<_, Error>(enc.len())
|
||||
}
|
||||
})
|
||||
.await??;
|
||||
tokio::fs::rename(&tmp_path, &path).await?;
|
||||
let ret = WrittenPbCache { bytes: res as u64 };
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ use crate::spawn_test_hosts;
|
||||
use bytes::BytesMut;
|
||||
use chrono::{DateTime, Utc};
|
||||
use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
|
||||
use disk::agg::streams::StreamItem;
|
||||
use disk::agg::streams::{Bins, StatsItem, StreamItem};
|
||||
use disk::binned::RangeCompletableItem;
|
||||
use disk::cache::BinnedQuery;
|
||||
use disk::frame::inmem::InMemoryFrameAsyncReadStream;
|
||||
@@ -52,33 +52,36 @@ fn get_binned_binary() {
|
||||
async fn get_binned_binary_inner() -> Result<(), Error> {
|
||||
let cluster = test_cluster();
|
||||
let _hosts = spawn_test_hosts(cluster.clone());
|
||||
get_binned_channel(
|
||||
"wave-f64-be-n21",
|
||||
"1970-01-01T00:20:10.000Z",
|
||||
"1970-01-01T00:20:30.000Z",
|
||||
2,
|
||||
&cluster,
|
||||
)
|
||||
.await?;
|
||||
if true {
|
||||
return Ok(());
|
||||
if false {
|
||||
get_binned_channel(
|
||||
"wave-f64-be-n21",
|
||||
"1970-01-01T00:20:10.000Z",
|
||||
"1970-01-01T00:20:30.000Z",
|
||||
2,
|
||||
&cluster,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
if false {
|
||||
get_binned_channel(
|
||||
"wave-u16-le-n77",
|
||||
"1970-01-01T01:11:00.000Z",
|
||||
"1970-01-01T01:40:00.000Z",
|
||||
7,
|
||||
&cluster,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
if true {
|
||||
get_binned_channel(
|
||||
"wave-u16-le-n77",
|
||||
"1970-01-01T01:42:00.000Z",
|
||||
"1970-01-01T03:55:00.000Z",
|
||||
2,
|
||||
&cluster,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
get_binned_channel(
|
||||
"wave-u16-le-n77",
|
||||
"1970-01-01T01:11:00.000Z",
|
||||
"1970-01-01T01:40:00.000Z",
|
||||
7,
|
||||
&cluster,
|
||||
)
|
||||
.await?;
|
||||
get_binned_channel(
|
||||
"wave-u16-le-n77",
|
||||
"1970-01-01T01:42:00.000Z",
|
||||
"1970-01-01T03:55:00.000Z",
|
||||
2,
|
||||
&cluster,
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -124,20 +127,43 @@ where
|
||||
let t2 = chrono::Utc::now();
|
||||
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
|
||||
info!("get_cached_0 DONE bin_count {} time {} ms", res.bin_count, ms);
|
||||
Ok(())
|
||||
if !res.is_valid() {
|
||||
error!("result is not valid:\n{:?}", res);
|
||||
Err(Error::with_msg("result not valid"))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct BinnedResponse {
|
||||
bin_count: usize,
|
||||
bin_count: u64,
|
||||
err_item_count: u64,
|
||||
data_item_count: u64,
|
||||
bytes_read: u64,
|
||||
range_complete_count: u64,
|
||||
log_item_count: u64,
|
||||
stats_item_count: u64,
|
||||
}
|
||||
|
||||
impl BinnedResponse {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
bin_count: 0,
|
||||
err_item_count: 0,
|
||||
data_item_count: 0,
|
||||
bytes_read: 0,
|
||||
range_complete_count: 0,
|
||||
log_item_count: 0,
|
||||
stats_item_count: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_valid(&self) -> bool {
|
||||
if self.range_complete_count > 1 {
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -189,29 +215,39 @@ where
|
||||
};
|
||||
ready(g)
|
||||
})
|
||||
.fold(Ok(BinnedResponse::new()), |a, k| {
|
||||
let g = match a {
|
||||
Ok(a) => match k {
|
||||
Ok(StreamItem::DataItem(_item)) => {
|
||||
// TODO extract bin count from item
|
||||
//a.bin_count += k.ts1s.len();
|
||||
Ok(a)
|
||||
.fold(BinnedResponse::new(), |mut a, k| {
|
||||
let g = match k {
|
||||
Ok(StreamItem::Log(_item)) => {
|
||||
a.log_item_count += 1;
|
||||
a
|
||||
}
|
||||
Ok(StreamItem::Stats(item)) => match item {
|
||||
StatsItem::EventDataReadStats(item) => {
|
||||
a.bytes_read += item.parsed_bytes;
|
||||
a
|
||||
}
|
||||
Ok(StreamItem::Stats(_item)) => {
|
||||
// TODO adapt to new Stats type:
|
||||
//a.bytes_read += stats.parsed_bytes;
|
||||
Ok(a)
|
||||
}
|
||||
Ok(_) => Ok(a),
|
||||
Err(e) => Err(e),
|
||||
},
|
||||
Err(e) => Err(e),
|
||||
Ok(StreamItem::DataItem(item)) => match item {
|
||||
RangeCompletableItem::RangeComplete => {
|
||||
a.range_complete_count += 1;
|
||||
a
|
||||
}
|
||||
RangeCompletableItem::Data(item) => {
|
||||
a.data_item_count += 1;
|
||||
a.bin_count += item.bin_count() as u64;
|
||||
a
|
||||
}
|
||||
},
|
||||
Err(_e) => {
|
||||
a.err_item_count += 1;
|
||||
a
|
||||
}
|
||||
};
|
||||
ready(g)
|
||||
});
|
||||
let ret = s1.await;
|
||||
info!("BinnedResponse: {:?}", ret);
|
||||
ret
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
Reference in New Issue
Block a user