diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 4b056e8..52777ea 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,7 +1,7 @@ use crate::agg::{IntoBinnedT, MinMaxAvgScalarBinBatch, MinMaxAvgScalarEventBatch}; use crate::merge::MergedMinMaxAvgScalarStream; -use crate::raw::{EventsQuery, InMemoryFrameAsyncReadStream}; -use bytes::{BufMut, Bytes, BytesMut}; +use crate::raw::{EventsQuery, FrameType, InMemoryFrameAsyncReadStream}; +use bytes::Bytes; use chrono::{DateTime, Utc}; use err::Error; use futures_core::Stream; @@ -144,24 +144,13 @@ impl Stream for BinnedBytesForHttpStream { return Ready(None); } match self.inp.poll_next_unpin(cx) { - Ready(Some(item)) => { - match bincode::serialize::(&item) { - Ok(enc) => { - // TODO optimize this... - const HEAD: usize = super::raw::INMEM_FRAME_HEAD; - let mut buf = BytesMut::with_capacity(enc.len() + HEAD); - buf.put_u32_le(super::raw::INMEM_FRAME_MAGIC); - buf.put_u32_le(enc.len() as u32); - buf.put_u32_le(0); - buf.put(enc.as_ref()); - Ready(Some(Ok(buf.freeze()))) - } - Err(e) => { - self.errored = true; - Ready(Some(Err(e.into()))) - } + Ready(Some(item)) => match super::raw::make_frame::(&item) { + Ok(buf) => Ready(Some(Ok(buf.freeze()))), + Err(e) => { + self.errored = true; + Ready(Some(Err(e.into()))) } - } + }, Ready(None) => { self.completed = true; Ready(None) @@ -241,24 +230,13 @@ impl Stream for PreBinnedValueByteStream { return Ready(None); } match self.inp.poll_next_unpin(cx) { - Ready(Some(item)) => { - // TODO optimize this - const HEAD: usize = super::raw::INMEM_FRAME_HEAD; - match bincode::serialize::(&item) { - Ok(enc) => { - let mut buf = BytesMut::with_capacity(enc.len() + HEAD); - buf.put_u32_le(super::raw::INMEM_FRAME_MAGIC); - buf.put_u32_le(enc.len() as u32); - buf.put_u32_le(0); - buf.put(enc.as_ref()); - Ready(Some(Ok(buf.freeze()))) - } - Err(e) => { - self.errored = true; - Ready(Some(Err(e.into()))) - } + Ready(Some(item)) => match super::raw::make_frame::(&item) { + Ok(buf) => Ready(Some(Ok(buf.freeze()))), + Err(e) => { + self.errored = true; + Ready(Some(Err(e.into()))) } - } + }, Ready(None) => Ready(None), Pending => Pending, } @@ -474,6 +452,8 @@ impl PreBinnedValueFetchedStream { } } +// TODO use a newtype here to use a different FRAME_TYPE_ID compared to +// impl FrameType for BinnedBytesForHttpStreamFrame pub type PreBinnedHttpFrame = Result; impl Stream for PreBinnedValueFetchedStream { @@ -486,10 +466,13 @@ impl Stream for PreBinnedValueFetchedStream { break if let Some(res) = self.res.as_mut() { pin_mut!(res); match res.poll_next(cx) { - Ready(Some(Ok(buf))) => match bincode::deserialize::(&buf) { - Ok(item) => Ready(Some(item)), - Err(e) => Ready(Some(Err(e.into()))), - }, + Ready(Some(Ok(frame))) => { + assert!(frame.tyid() == ::FRAME_TYPE_ID); + match bincode::deserialize::(frame.buf()) { + Ok(item) => Ready(Some(item)), + Err(e) => Ready(Some(Err(e.into()))), + } + } Ready(Some(Err(e))) => Ready(Some(Err(e.into()))), Ready(None) => Ready(None), Pending => Pending, diff --git a/disk/src/raw.rs b/disk/src/raw.rs index 58e4a50..cdab5ba 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -6,6 +6,7 @@ to request such data from nodes. */ use crate::agg::MinMaxAvgScalarEventBatch; +use crate::cache::BinnedBytesForHttpStreamFrame; use bytes::{BufMut, Bytes, BytesMut}; use err::Error; use futures_core::Stream; @@ -33,23 +34,20 @@ pub struct EventsQuery { pub agg_kind: AggKind, } +#[derive(Serialize, Deserialize)] +pub struct EventQueryJsonStringFrame(String); + pub async fn x_processed_stream_from_node( query: Arc, node: Arc, ) -> Result> + Send>>, Error> { let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; - let qjs = serde_json::to_vec(query.as_ref())?; + let qjs = serde_json::to_string(query.as_ref())?; let (netin, mut netout) = net.into_split(); - - // TODO this incorrect magic MUST bubble up into the final result and be reported. - - netout.write_u32_le(INMEM_FRAME_MAGIC).await?; - netout.write_u32_le(qjs.len() as u32).await?; - netout.write_u32_le(0).await?; - netout.write_all(&qjs).await?; - netout.write_u32_le(INMEM_FRAME_MAGIC).await?; - netout.write_u32_le(0).await?; - netout.write_u32_le(0).await?; + let buf = make_frame(&EventQueryJsonStringFrame(qjs))?; + netout.write_all(&buf).await?; + let buf = make_term_frame(); + netout.write_all(&buf).await?; netout.flush().await?; netout.forget(); let frames = InMemoryFrameAsyncReadStream::new(netin); @@ -86,21 +84,22 @@ where let j = &mut self.inp; pin_mut!(j); break match j.poll_next(cx) { - Ready(Some(Ok(buf))) => { + Ready(Some(Ok(frame))) => { + type ExpectedType = RawConnOut; info!( "MinMaxAvgScalarEventBatchStreamFromFrames got full frame buf {}", - buf.len() + frame.buf().len() ); - match bincode::deserialize::(buf.as_ref()) { + assert!(frame.tyid() == ::FRAME_TYPE_ID); + match bincode::deserialize::(frame.buf()) { Ok(item) => match item { Ok(item) => Ready(Some(Ok(item))), Err(e) => Ready(Some(Err(e))), }, Err(e) => { trace!( - "MinMaxAvgScalarEventBatchStreamFromFrames ~~~~~~~~ ERROR on frame payload {} {}", - buf.len(), - crchex(buf) + "MinMaxAvgScalarEventBatchStreamFromFrames ~~~~~~~~ ERROR on frame payload {}", + frame.buf().len(), ); Ready(Some(Err(e.into()))) } @@ -114,7 +113,7 @@ where } } -pub const INMEM_FRAME_HEAD: usize = 12; +pub const INMEM_FRAME_HEAD: usize = 16; pub const INMEM_FRAME_MAGIC: u32 = 0xc6c3b73d; /** @@ -154,13 +153,14 @@ where } } - fn tryparse(&mut self) -> Option>> { + fn tryparse(&mut self) -> Option>> { const HEAD: usize = INMEM_FRAME_HEAD; let mut buf = std::mem::replace(&mut self.buf, BytesMut::new()); if self.wp >= HEAD { let magic = u32::from_le_bytes(*arrayref::array_ref![buf, 0, 4]); - let len = u32::from_le_bytes(*arrayref::array_ref![buf, 4, 4]); - let _tyid = u32::from_le_bytes(*arrayref::array_ref![buf, 8, 4]); + let encid = u32::from_le_bytes(*arrayref::array_ref![buf, 4, 4]); + let tyid = u32::from_le_bytes(*arrayref::array_ref![buf, 8, 4]); + let len = u32::from_le_bytes(*arrayref::array_ref![buf, 12, 4]); if magic != INMEM_FRAME_MAGIC { error!("InMemoryFrameAsyncReadStream tryparse incorrect magic: {}", magic); return Some(Some(Err(Error::with_msg(format!( @@ -206,7 +206,13 @@ where self.wp = self.wp - nl; self.buf = buf3; self.inp_bytes_consumed += nl as u64; - Some(Some(Ok(buf.freeze()))) + let ret = InMemoryFrame { + len, + tyid, + encid, + buf: buf.freeze(), + }; + Some(Some(Ok(ret))) } else { self.buf = buf; None @@ -219,11 +225,33 @@ where } } +pub struct InMemoryFrame { + encid: u32, + tyid: u32, + len: u32, + buf: Bytes, +} + +impl InMemoryFrame { + pub fn encid(&self) -> u32 { + self.encid + } + pub fn tyid(&self) -> u32 { + self.tyid + } + pub fn len(&self) -> u32 { + self.len + } + pub fn buf(&self) -> &Bytes { + &self.buf + } +} + impl Stream for InMemoryFrameAsyncReadStream where T: AsyncRead + Unpin, { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -295,15 +323,52 @@ where } } -// TODO build a stream from disk data to batched event data. -#[allow(dead_code)] -async fn local_unpacked_test() { - let query = err::todoval(); - let node = err::todoval(); - // TODO open and parse the channel config. - // TODO find the matching config entry. (bonus: fuse consecutive compatible entries) - use crate::agg::IntoDim1F32Stream; - let _stream = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node).into_dim_1_f32_stream(); +pub trait FrameType { + const FRAME_TYPE_ID: u32; +} + +impl FrameType for BinnedBytesForHttpStreamFrame { + const FRAME_TYPE_ID: u32 = 0x02; +} + +impl FrameType for EventQueryJsonStringFrame { + const FRAME_TYPE_ID: u32 = 0x03; +} + +impl FrameType for RawConnOut { + const FRAME_TYPE_ID: u32 = 0x04; +} + +pub fn make_frame(item: &FT) -> Result +where + FT: FrameType + Serialize, +{ + match bincode::serialize(item) { + Ok(enc) => { + if enc.len() > u32::MAX as usize { + return Err(Error::with_msg(format!("too long payload {}", enc.len()))); + } + let encid = 0x12121212; + let mut buf = BytesMut::with_capacity(enc.len() + INMEM_FRAME_HEAD); + buf.put_u32_le(INMEM_FRAME_MAGIC); + buf.put_u32_le(encid); + buf.put_u32_le(FT::FRAME_TYPE_ID); + buf.put_u32_le(enc.len() as u32); + buf.put(enc.as_ref()); + Ok(buf) + } + Err(e) => Err(e)?, + } +} + +pub fn make_term_frame() -> BytesMut { + let encid = 0x12121313; + let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD); + buf.put_u32_le(INMEM_FRAME_MAGIC); + buf.put_u32_le(encid); + buf.put_u32_le(0x01); + buf.put_u32_le(0); + buf } pub async fn raw_service(node_config: Arc) -> Result<(), Error> { @@ -332,17 +397,7 @@ async fn raw_conn_handler_inner(stream: TcpStream, addr: SocketAddr) -> Result<( Ok(_) => (), Err(mut ce) => { error!("raw_conn_handler_inner CAUGHT ERROR AND TRY TO SEND OVER TCP"); - let enc = bincode::serialize::(&Err(ce.err))?; - // TODO optimize - let mut buf = BytesMut::with_capacity(enc.len() + INMEM_FRAME_HEAD); - buf.put_u32_le(INMEM_FRAME_MAGIC); - buf.put_u32_le(enc.len() as u32); - buf.put_u32_le(0); - buf.put(enc.as_ref()); - trace!( - "raw_conn_handler_inner ~~~~~~~~~~~~ EMIT FRAME PAYLOAD CRC {}", - crchex(&enc) - ); + let buf = make_frame::(&Err(ce.err))?; match ce.netout.write(&buf).await { Ok(_) => (), Err(e) => return Err(e)?, @@ -390,14 +445,12 @@ async fn raw_conn_handler_inner_try(stream: TcpStream, addr: SocketAddr) -> Resu error!("expect a command frame"); return Err((Error::with_msg("expect a command frame"), netout))?; } - let s1 = match String::from_utf8(frames[0].to_vec()) { + let qitem = match bincode::deserialize::(frames[0].buf()) { Ok(k) => k, - Err(e) => { - return Err((e, netout))?; - } + Err(e) => return Err((e, netout))?, }; - trace!("json {}", s1); - let res: Result = serde_json::from_str(&s1); + trace!("json: {}", qitem.0); + let res: Result = serde_json::from_str(&qitem.0); let evq = match res { Ok(k) => k, Err(e) => { @@ -420,32 +473,17 @@ async fn raw_conn_handler_inner_try(stream: TcpStream, addr: SocketAddr) -> Resu batch.avgs.push(9.6); let mut s1 = futures_util::stream::iter(vec![batch]).map(Result::Ok); while let Some(item) = s1.next().await { - match bincode::serialize::(&item) { - Ok(enc) => { - let mut buf = BytesMut::with_capacity(enc.len() + INMEM_FRAME_HEAD); - buf.put_u32_le(INMEM_FRAME_MAGIC); - buf.put_u32_le(enc.len() as u32); - buf.put_u32_le(0); - buf.put(enc.as_ref()); - trace!( - "raw_conn_handler_inner_try ~~~~~~~~~~~~ EMIT FRAME PAYLOAD {} {}", - enc.len(), - crchex(enc) - ); - match netout.write(&buf).await { - Ok(_) => {} - Err(e) => return Err((e, netout))?, - } - } + match make_frame::(&item) { + Ok(buf) => match netout.write(&buf).await { + Ok(_) => {} + Err(e) => return Err((e, netout))?, + }, Err(e) => { return Err((e, netout))?; } } } - let mut buf = BytesMut::with_capacity(32); - buf.put_u32_le(INMEM_FRAME_MAGIC); - buf.put_u32_le(0); - buf.put_u32_le(0); + let buf = make_term_frame(); match netout.write(&buf).await { Ok(_) => (), Err(e) => return Err((e, netout))?, diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index d32f63d..c1c925b 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -87,9 +87,10 @@ async fn get_cached_0_inner() -> Result<(), Error> { .map_err(|e| error!("TEST GOT ERROR {:?}", e)) .filter_map(|item| { let g = match item { - Ok(buf) => { - info!("TEST GOT FRAME len {}", buf.len()); - match bincode::deserialize::(&buf) { + Ok(frame) => { + type ExpectedType = disk::cache::BinnedBytesForHttpStreamFrame; + info!("TEST GOT FRAME len {}", frame.buf().len()); + match bincode::deserialize::(frame.buf()) { Ok(item) => match item { Ok(item) => { info!("TEST GOT ITEM");