From 168e5329748d3b6fc2f2afee202cc0c628c3a9f5 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 30 Apr 2021 19:44:44 +0200 Subject: [PATCH] Some fixes --- disk/src/cache/pbv.rs | 38 +++++++++++++++++-- disk/src/frame/inmem.rs | 75 ++++++++++++++++++++++--------------- disk/src/frame/makeframe.rs | 19 +++++++++- retrieval/src/test.rs | 15 ++++++-- 4 files changed, 110 insertions(+), 37 deletions(-) diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index 15f9f23..58a1e70 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -66,6 +66,8 @@ pub struct PreBinnedValueStream { node_config: NodeConfig, open_check_local_file: Option> + Send>>>, fut2: Option> + Send>>>, + errored: bool, + completed: bool, } impl PreBinnedValueStream { @@ -84,6 +86,8 @@ impl PreBinnedValueStream { node_config: node_config.clone(), open_check_local_file: None, fut2: None, + errored: false, + completed: false, } } @@ -156,20 +160,48 @@ impl Stream for PreBinnedValueStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + if self.completed { + panic!("PreBinnedValueStream poll_next on completed"); + } + if self.errored { + self.completed = true; + return Ready(None); + } 'outer: loop { break if let Some(fut) = self.fut2.as_mut() { - fut.poll_next_unpin(cx) + match fut.poll_next_unpin(cx) { + Ready(Some(Ok(k))) => Ready(Some(Ok(k))), + Ready(Some(Err(e))) => { + self.errored = true; + Ready(Some(Err(e))) + } + Ready(None) => Ready(None), + Pending => Pending, + } } else if let Some(fut) = self.open_check_local_file.as_mut() { match fut.poll_unpin(cx) { - Ready(Ok(_file)) => err::todoval(), + Ready(Ok(_file)) => { + self.errored = true; + Ready(Some(Err(Error::with_msg(format!( + "TODO use the cached data from file" + ))))) + } Ready(Err(e)) => match e.kind() { std::io::ErrorKind::NotFound => { error!("TODO LOCAL CACHE FILE NOT FOUND"); self.try_setup_fetch_prebinned_higher_res(); - continue 'outer; + if self.fut2.is_none() { + self.errored = true; + Ready(Some(Err(Error::with_msg(format!( + "try_setup_fetch_prebinned_higher_res failed" + ))))) + } else { + continue 'outer; + } } _ => { error!("File I/O error: {:?}", e); + self.errored = true; Ready(Some(Err(e.into()))) } }, diff --git a/disk/src/frame/inmem.rs b/disk/src/frame/inmem.rs index fe14494..0b5e145 100644 --- a/disk/src/frame/inmem.rs +++ b/disk/src/frame/inmem.rs @@ -1,4 +1,4 @@ -use crate::frame::makeframe::{INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC}; +use crate::frame::makeframe::{INMEM_FRAME_FOOT, INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC}; use bytes::{BufMut, Bytes, BytesMut}; use err::Error; use futures_core::Stream; @@ -108,10 +108,9 @@ where buf.len(), wp ); - const HEAD: usize = INMEM_FRAME_HEAD; let mut buf = buf; let nb = wp; - if nb >= HEAD { + if nb >= INMEM_FRAME_HEAD { let magic = u32::from_le_bytes(*arrayref::array_ref![buf, 0, 4]); let encid = u32::from_le_bytes(*arrayref::array_ref![buf, 4, 4]); let tyid = u32::from_le_bytes(*arrayref::array_ref![buf, 8, 4]); @@ -129,22 +128,12 @@ where } trace!("tryparse len {}", len); if len == 0 { - if nb != HEAD { - return ( - Some(Some(Err(Error::with_msg(format!( - "InMemoryFrameAsyncReadStream tryparse unexpected amount left {}", - nb - ))))), - buf, - wp, - ); - } + info!("stop-frame with nb {}", nb); (Some(None), buf, wp) } else { if len > 1024 * 32 { warn!("InMemoryFrameAsyncReadStream big len received {}", len); - } - if len > 1024 * 1024 * 2 { + } else if len > 1024 * 1024 * 2 { error!("InMemoryFrameAsyncReadStream too long len {}", len); return ( Some(Some(Err(Error::with_msg(format!( @@ -155,27 +144,55 @@ where wp, ); } - if len == 0 && len > 1024 * 512 { - return ( - Some(Some(Err(Error::with_msg(format!( - "InMemoryFrameAsyncReadStream tryparse len {} self.inp_bytes_consumed {}", - len, self.inp_bytes_consumed - ))))), - buf, - wp, - ); - } - let nl = len as usize + HEAD; + let nl = len as usize + INMEM_FRAME_HEAD + INMEM_FRAME_FOOT; if self.bufcap < nl { // TODO count cases in production let n = 2 * nl; warn!("Adjust bufcap old {} new {}", self.bufcap, n); self.bufcap = n; } - if nb >= nl { + if nb < nl { + (None, buf, wp) + } else { use bytes::Buf; + let mut h = crc32fast::Hasher::new(); + h.update(&buf[..(nl - INMEM_FRAME_FOOT)]); + let frame_crc = h.finalize(); + let mut h = crc32fast::Hasher::new(); + h.update(&buf[INMEM_FRAME_HEAD..(nl - INMEM_FRAME_FOOT)]); + let payload_crc = h.finalize(); + let frame_crc_ind = + u32::from_le_bytes(*arrayref::array_ref![buf, INMEM_FRAME_HEAD + len as usize, 4]); + let payload_crc_ind = u32::from_le_bytes(*arrayref::array_ref![buf, 16, 4]); + let payload_crc_match = payload_crc_ind == payload_crc; + let frame_crc_match = frame_crc_ind == frame_crc; + if !payload_crc_match || !frame_crc_match { + return ( + Some(Some(Err(Error::with_msg(format!( + "InMemoryFrameAsyncReadStream tryparse crc mismatch {} {}", + payload_crc_match, frame_crc_match, + ))))), + buf, + wp, + ); + } let mut buf3 = buf.split_to(nl); - buf3.advance(HEAD); + buf3.advance(INMEM_FRAME_HEAD); + buf3.truncate(len as usize); + let mut h = crc32fast::Hasher::new(); + h.update(&buf3); + let payload_crc_2 = h.finalize(); + let payload_crc_2_match = payload_crc_2 == payload_crc_ind; + if !payload_crc_2_match { + return ( + Some(Some(Err(Error::with_msg(format!( + "InMemoryFrameAsyncReadStream tryparse crc mismatch {} {} {}", + payload_crc_match, frame_crc_match, payload_crc_2_match, + ))))), + buf, + wp, + ); + } self.inp_bytes_consumed += nl as u64; let ret = InMemoryFrame { len, @@ -184,8 +201,6 @@ where buf: buf3.freeze(), }; (Some(Some(Ok(ret))), buf, wp - nl) - } else { - (None, buf, wp) } } } else { diff --git a/disk/src/frame/makeframe.rs b/disk/src/frame/makeframe.rs index 034b87c..c9b5fdc 100644 --- a/disk/src/frame/makeframe.rs +++ b/disk/src/frame/makeframe.rs @@ -6,7 +6,8 @@ use bytes::{BufMut, BytesMut}; use err::Error; use serde::{Deserialize, Serialize}; -pub const INMEM_FRAME_HEAD: usize = 16; +pub const INMEM_FRAME_HEAD: usize = 20; +pub const INMEM_FRAME_FOOT: usize = 4; pub const INMEM_FRAME_MAGIC: u32 = 0xc6c3b73d; pub trait FrameType { @@ -34,13 +35,21 @@ where if enc.len() > u32::MAX as usize { return Err(Error::with_msg(format!("too long payload {}", enc.len()))); } + let mut h = crc32fast::Hasher::new(); + h.update(&enc); + let payload_crc = h.finalize(); let encid = 0x12121212; let mut buf = BytesMut::with_capacity(enc.len() + INMEM_FRAME_HEAD); buf.put_u32_le(INMEM_FRAME_MAGIC); buf.put_u32_le(encid); buf.put_u32_le(FT::FRAME_TYPE_ID); buf.put_u32_le(enc.len() as u32); + buf.put_u32_le(payload_crc); buf.put(enc.as_ref()); + let mut h = crc32fast::Hasher::new(); + h.update(&buf); + let frame_crc = h.finalize(); + buf.put_u32_le(frame_crc); Ok(buf) } Err(e) => Err(e)?, @@ -48,12 +57,20 @@ where } pub fn make_term_frame() -> BytesMut { + let mut h = crc32fast::Hasher::new(); + h.update(&[]); + let payload_crc = h.finalize(); let encid = 0x12121313; let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD); buf.put_u32_le(INMEM_FRAME_MAGIC); buf.put_u32_le(encid); buf.put_u32_le(0x01); buf.put_u32_le(0); + buf.put_u32_le(payload_crc); + let mut h = crc32fast::Hasher::new(); + h.update(&buf); + let frame_crc = h.finalize(); + buf.put_u32_le(frame_crc); buf } diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index 17d4a55..0a4f50d 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -42,11 +42,14 @@ fn get_binned() { } async fn get_binned_0_inner() -> Result<(), Error> { + let cluster = test_cluster(); + let _hosts = spawn_test_hosts(cluster.clone()); get_binned_channel( "wave-f64-be-n21", "1970-01-01T00:20:10.000Z", "1970-01-01T00:20:51.000Z", 4, + &cluster, ) .await?; get_binned_channel( @@ -54,6 +57,7 @@ async fn get_binned_0_inner() -> Result<(), Error> { "1970-01-01T01:11:00.000Z", "1970-01-01T02:12:00.000Z", 4, + &cluster, ) .await?; get_binned_channel( @@ -61,19 +65,24 @@ async fn get_binned_0_inner() -> Result<(), Error> { "1970-01-01T01:42:00.000Z", "1970-01-01T03:55:00.000Z", 2, + &cluster, ) .await?; Ok(()) } -async fn get_binned_channel(channel_name: &str, beg_date: S, end_date: S, bin_count: u32) -> Result<(), Error> +async fn get_binned_channel( + channel_name: &str, + beg_date: S, + end_date: S, + bin_count: u32, + cluster: &Cluster, +) -> Result<(), Error> where S: AsRef, { let t1 = Utc::now(); - let cluster = test_cluster(); let node0 = &cluster.nodes[0]; - let _hosts = spawn_test_hosts(cluster.clone()); let beg_date: DateTime = beg_date.as_ref().parse()?; let end_date: DateTime = end_date.as_ref().parse()?; let channel_backend = "back";