Relative CLI datetime, cache clear api, work on cache read

This commit is contained in:
Dominik Werder
2021-05-27 12:07:44 +02:00
parent b3b2b3e4f7
commit 7aecf59195
11 changed files with 409 additions and 232 deletions

View File

@@ -356,7 +356,7 @@ where
{
buf: Vec<u8>,
file: Option<File>,
_mark: std::marker::PhantomData<T>,
_marker: std::marker::PhantomData<T>,
}
impl<T> ReadPbv<T>
@@ -367,7 +367,7 @@ where
Self {
buf: vec![],
file: Some(file),
_mark: std::marker::PhantomData::default(),
_marker: std::marker::PhantomData::default(),
}
}
}
@@ -380,26 +380,29 @@ where
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
let mut buf = vec![];
let mut dst = ReadBuf::new(&mut buf);
let fp = self.file.as_mut().unwrap();
let f = Pin::new(fp);
match File::poll_read(f, cx, &mut dst) {
Ready(res) => match res {
Ok(_) => {
if dst.filled().len() > 0 {
self.buf.extend_from_slice(&mut buf);
Pending
} else {
match T::from_buf(&mut self.buf) {
Ok(item) => Ready(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))),
Err(e) => Ready(Err(e)),
'outer: loop {
// TODO make buffer size a parameter:
let mut buf = vec![0; 4096];
let mut dst = ReadBuf::new(&mut buf);
let fp = self.file.as_mut().unwrap();
let f = Pin::new(fp);
break match File::poll_read(f, cx, &mut dst) {
Ready(res) => match res {
Ok(_) => {
if dst.filled().len() > 0 {
self.buf.extend_from_slice(&mut buf);
continue 'outer;
} else {
match T::from_buf(&mut self.buf) {
Ok(item) => Ready(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))),
Err(e) => Ready(Err(e)),
}
}
}
}
Err(e) => Ready(Err(e.into())),
},
Pending => Pending,
Err(e) => Ready(Err(e.into())),
},
Pending => Pending,
};
}
}
}
@@ -415,6 +418,9 @@ impl ReadableFromFile for MinMaxAvgScalarBinBatch {
Ok(ReadPbv::new(file))
}
fn from_buf(buf: &[u8]) -> Result<Self, Error> {
let mut h = crc32fast::Hasher::new();
h.update(&buf);
info!("try to deserialize from buf len {} crc {}", buf.len(), h.finalize());
let dec: MinMaxAvgScalarBinBatch = serde_cbor::from_slice(&buf)?;
Ok(dec)
}

View File

@@ -9,22 +9,22 @@ use chrono::{DateTime, Utc};
use err::Error;
use futures_core::Stream;
use futures_util::{pin_mut, StreamExt};
use hyper::Response;
use hyper::{Body, Response};
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{
AggKind, ByteSize, Channel, Cluster, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchCoord, ToNanos,
};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::collections::{BTreeMap, VecDeque};
use std::fmt::{Display, Formatter};
use std::future::Future;
use std::io;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
use tiny_keccak::Hasher;
use tokio::io::{AsyncRead, ReadBuf};
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
pub mod pbv;
pub mod pbvfs;
@@ -275,19 +275,19 @@ where
node_config.ix, patch_node_ix
)))
} else {
let ret = super::cache::pbv::pre_binned_value_byte_stream_new(query, node_config, stream_kind);
let ret = crate::cache::pbv::pre_binned_value_byte_stream_new(query, node_config, stream_kind);
Ok(ret)
}
}
pub struct HttpBodyAsAsyncRead {
inp: Response<hyper::Body>,
inp: Response<Body>,
left: Bytes,
rp: usize,
}
impl HttpBodyAsAsyncRead {
pub fn new(inp: hyper::Response<hyper::Body>) -> Self {
pub fn new(inp: Response<Body>) -> Self {
Self {
inp,
left: Bytes::new(),
@@ -297,7 +297,7 @@ impl HttpBodyAsAsyncRead {
}
impl AsyncRead for HttpBodyAsAsyncRead {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut ReadBuf) -> Poll<std::io::Result<()>> {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut ReadBuf) -> Poll<io::Result<()>> {
use hyper::body::HttpBody;
use Poll::*;
if self.left.len() != 0 {
@@ -329,8 +329,8 @@ impl AsyncRead for HttpBodyAsAsyncRead {
Ready(Ok(()))
}
}
Ready(Some(Err(e))) => Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
Ready(Some(Err(e))) => Ready(Err(io::Error::new(
io::ErrorKind::Other,
Error::with_msg(format!("Received by HttpBodyAsAsyncRead: {:?}", e)),
))),
Ready(None) => Ready(Ok(())),
@@ -551,13 +551,22 @@ where
};
let path = cfd.path(&node_config);
let enc = serde_cbor::to_vec(&values)?;
info!("Writing cache file size {}\n{:?}\npath: {:?}", enc.len(), cfd, path);
let mut h = crc32fast::Hasher::new();
h.update(&enc);
info!(
"Writing cache file len {} crc {}\n{:?}\npath: {:?}",
enc.len(),
h.finalize(),
cfd,
path
);
tokio::fs::create_dir_all(path.parent().unwrap()).await?;
let res = tokio::task::spawn_blocking({
let path = path.clone();
move || {
use fs2::FileExt;
use std::io::Write;
use io::Write;
// TODO write to random tmp file first and then move into place.
let mut f = std::fs::OpenOptions::new()
.create(true)
.truncate(true)
@@ -573,3 +582,83 @@ where
let ret = WrittenPbCache { bytes: res as u64 };
Ok(ret)
}
#[derive(Serialize)]
pub struct ClearCacheAllResult {
pub log: Vec<String>,
}
pub async fn clear_cache_all(node_config: &NodeConfigCached, dry: bool) -> Result<ClearCacheAllResult, Error> {
let mut log = vec![];
log.push(format!("begin at {:?}", chrono::Utc::now()));
if dry {
log.push(format!("dry run"));
}
let mut dirs = VecDeque::new();
let mut stack = VecDeque::new();
stack.push_front(node_config.node.data_base_path.join("cache"));
loop {
match stack.pop_front() {
Some(path) => {
let mut rd = tokio::fs::read_dir(path).await?;
while let Some(entry) = rd.next_entry().await? {
let path = entry.path();
match path.to_str() {
Some(_pathstr) => {
let meta = path.symlink_metadata()?;
//log.push(format!("len {:7} pathstr {}", meta.len(), pathstr,));
let filename_str = path.file_name().unwrap().to_str().unwrap();
if filename_str.ends_with("..") || filename_str.ends_with(".") {
log.push(format!("ERROR encountered . or .."));
} else {
if meta.is_dir() {
stack.push_front(path.clone());
dirs.push_front((meta.len(), path));
} else if meta.is_file() {
log.push(format!("remove file len {:7} {}", meta.len(), path.to_string_lossy()));
if !dry {
match tokio::fs::remove_file(&path).await {
Ok(_) => {}
Err(e) => {
log.push(format!(
"can not remove file {} {:?}",
path.to_string_lossy(),
e
));
}
}
}
} else {
log.push(format!("not file, note dir"));
}
}
}
None => {
log.push(format!("Invalid utf-8 path encountered"));
}
}
}
}
None => break,
}
}
log.push(format!(
"start to remove {} dirs at {:?}",
dirs.len(),
chrono::Utc::now()
));
for (len, path) in dirs {
log.push(format!("remove dir len {} {}", len, path.to_string_lossy()));
if !dry {
match tokio::fs::remove_dir(&path).await {
Ok(_) => {}
Err(e) => {
log.push(format!("can not remove dir {} {:?}", path.to_string_lossy(), e));
}
}
}
}
log.push(format!("done at {:?}", chrono::Utc::now()));
let ret = ClearCacheAllResult { log };
Ok(ret)
}