Some fixes

This commit is contained in:
Dominik Werder
2021-04-30 19:44:44 +02:00
parent 40c27450c4
commit 168e532974
4 changed files with 110 additions and 37 deletions

38
disk/src/cache/pbv.rs vendored
View File

@@ -66,6 +66,8 @@ pub struct PreBinnedValueStream {
node_config: NodeConfig,
open_check_local_file: Option<Pin<Box<dyn Future<Output = Result<tokio::fs::File, std::io::Error>> + Send>>>,
fut2: Option<Pin<Box<dyn Stream<Item = Result<MinMaxAvgScalarBinBatch, Error>> + Send>>>,
errored: bool,
completed: bool,
}
impl PreBinnedValueStream {
@@ -84,6 +86,8 @@ impl PreBinnedValueStream {
node_config: node_config.clone(),
open_check_local_file: None,
fut2: None,
errored: false,
completed: false,
}
}
@@ -156,20 +160,48 @@ impl Stream for PreBinnedValueStream {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if self.completed {
panic!("PreBinnedValueStream poll_next on completed");
}
if self.errored {
self.completed = true;
return Ready(None);
}
'outer: loop {
break if let Some(fut) = self.fut2.as_mut() {
fut.poll_next_unpin(cx)
match fut.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => Ready(Some(Ok(k))),
Ready(Some(Err(e))) => {
self.errored = true;
Ready(Some(Err(e)))
}
Ready(None) => Ready(None),
Pending => Pending,
}
} else if let Some(fut) = self.open_check_local_file.as_mut() {
match fut.poll_unpin(cx) {
Ready(Ok(_file)) => err::todoval(),
Ready(Ok(_file)) => {
self.errored = true;
Ready(Some(Err(Error::with_msg(format!(
"TODO use the cached data from file"
)))))
}
Ready(Err(e)) => match e.kind() {
std::io::ErrorKind::NotFound => {
error!("TODO LOCAL CACHE FILE NOT FOUND");
self.try_setup_fetch_prebinned_higher_res();
continue 'outer;
if self.fut2.is_none() {
self.errored = true;
Ready(Some(Err(Error::with_msg(format!(
"try_setup_fetch_prebinned_higher_res failed"
)))))
} else {
continue 'outer;
}
}
_ => {
error!("File I/O error: {:?}", e);
self.errored = true;
Ready(Some(Err(e.into())))
}
},

View File

@@ -1,4 +1,4 @@
use crate::frame::makeframe::{INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC};
use crate::frame::makeframe::{INMEM_FRAME_FOOT, INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC};
use bytes::{BufMut, Bytes, BytesMut};
use err::Error;
use futures_core::Stream;
@@ -108,10 +108,9 @@ where
buf.len(),
wp
);
const HEAD: usize = INMEM_FRAME_HEAD;
let mut buf = buf;
let nb = wp;
if nb >= HEAD {
if nb >= INMEM_FRAME_HEAD {
let magic = u32::from_le_bytes(*arrayref::array_ref![buf, 0, 4]);
let encid = u32::from_le_bytes(*arrayref::array_ref![buf, 4, 4]);
let tyid = u32::from_le_bytes(*arrayref::array_ref![buf, 8, 4]);
@@ -129,22 +128,12 @@ where
}
trace!("tryparse len {}", len);
if len == 0 {
if nb != HEAD {
return (
Some(Some(Err(Error::with_msg(format!(
"InMemoryFrameAsyncReadStream tryparse unexpected amount left {}",
nb
))))),
buf,
wp,
);
}
info!("stop-frame with nb {}", nb);
(Some(None), buf, wp)
} else {
if len > 1024 * 32 {
warn!("InMemoryFrameAsyncReadStream big len received {}", len);
}
if len > 1024 * 1024 * 2 {
} else if len > 1024 * 1024 * 2 {
error!("InMemoryFrameAsyncReadStream too long len {}", len);
return (
Some(Some(Err(Error::with_msg(format!(
@@ -155,27 +144,55 @@ where
wp,
);
}
if len == 0 && len > 1024 * 512 {
return (
Some(Some(Err(Error::with_msg(format!(
"InMemoryFrameAsyncReadStream tryparse len {} self.inp_bytes_consumed {}",
len, self.inp_bytes_consumed
))))),
buf,
wp,
);
}
let nl = len as usize + HEAD;
let nl = len as usize + INMEM_FRAME_HEAD + INMEM_FRAME_FOOT;
if self.bufcap < nl {
// TODO count cases in production
let n = 2 * nl;
warn!("Adjust bufcap old {} new {}", self.bufcap, n);
self.bufcap = n;
}
if nb >= nl {
if nb < nl {
(None, buf, wp)
} else {
use bytes::Buf;
let mut h = crc32fast::Hasher::new();
h.update(&buf[..(nl - INMEM_FRAME_FOOT)]);
let frame_crc = h.finalize();
let mut h = crc32fast::Hasher::new();
h.update(&buf[INMEM_FRAME_HEAD..(nl - INMEM_FRAME_FOOT)]);
let payload_crc = h.finalize();
let frame_crc_ind =
u32::from_le_bytes(*arrayref::array_ref![buf, INMEM_FRAME_HEAD + len as usize, 4]);
let payload_crc_ind = u32::from_le_bytes(*arrayref::array_ref![buf, 16, 4]);
let payload_crc_match = payload_crc_ind == payload_crc;
let frame_crc_match = frame_crc_ind == frame_crc;
if !payload_crc_match || !frame_crc_match {
return (
Some(Some(Err(Error::with_msg(format!(
"InMemoryFrameAsyncReadStream tryparse crc mismatch {} {}",
payload_crc_match, frame_crc_match,
))))),
buf,
wp,
);
}
let mut buf3 = buf.split_to(nl);
buf3.advance(HEAD);
buf3.advance(INMEM_FRAME_HEAD);
buf3.truncate(len as usize);
let mut h = crc32fast::Hasher::new();
h.update(&buf3);
let payload_crc_2 = h.finalize();
let payload_crc_2_match = payload_crc_2 == payload_crc_ind;
if !payload_crc_2_match {
return (
Some(Some(Err(Error::with_msg(format!(
"InMemoryFrameAsyncReadStream tryparse crc mismatch {} {} {}",
payload_crc_match, frame_crc_match, payload_crc_2_match,
))))),
buf,
wp,
);
}
self.inp_bytes_consumed += nl as u64;
let ret = InMemoryFrame {
len,
@@ -184,8 +201,6 @@ where
buf: buf3.freeze(),
};
(Some(Some(Ok(ret))), buf, wp - nl)
} else {
(None, buf, wp)
}
}
} else {

View File

@@ -6,7 +6,8 @@ use bytes::{BufMut, BytesMut};
use err::Error;
use serde::{Deserialize, Serialize};
pub const INMEM_FRAME_HEAD: usize = 16;
pub const INMEM_FRAME_HEAD: usize = 20;
pub const INMEM_FRAME_FOOT: usize = 4;
pub const INMEM_FRAME_MAGIC: u32 = 0xc6c3b73d;
pub trait FrameType {
@@ -34,13 +35,21 @@ where
if enc.len() > u32::MAX as usize {
return Err(Error::with_msg(format!("too long payload {}", enc.len())));
}
let mut h = crc32fast::Hasher::new();
h.update(&enc);
let payload_crc = h.finalize();
let encid = 0x12121212;
let mut buf = BytesMut::with_capacity(enc.len() + INMEM_FRAME_HEAD);
buf.put_u32_le(INMEM_FRAME_MAGIC);
buf.put_u32_le(encid);
buf.put_u32_le(FT::FRAME_TYPE_ID);
buf.put_u32_le(enc.len() as u32);
buf.put_u32_le(payload_crc);
buf.put(enc.as_ref());
let mut h = crc32fast::Hasher::new();
h.update(&buf);
let frame_crc = h.finalize();
buf.put_u32_le(frame_crc);
Ok(buf)
}
Err(e) => Err(e)?,
@@ -48,12 +57,20 @@ where
}
pub fn make_term_frame() -> BytesMut {
let mut h = crc32fast::Hasher::new();
h.update(&[]);
let payload_crc = h.finalize();
let encid = 0x12121313;
let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD);
buf.put_u32_le(INMEM_FRAME_MAGIC);
buf.put_u32_le(encid);
buf.put_u32_le(0x01);
buf.put_u32_le(0);
buf.put_u32_le(payload_crc);
let mut h = crc32fast::Hasher::new();
h.update(&buf);
let frame_crc = h.finalize();
buf.put_u32_le(frame_crc);
buf
}