diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 75627ef..bfe2405 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -14,6 +14,7 @@ hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "t async-channel = "1.6" bytes = "1.0.1" bincode = "1.3.3" +crc32fast = "1.2.1" arrayref = "0.3.6" byteorder = "1.4.3" futures-core = "0.3.14" diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 1b7d3c8..0027ac6 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -2,7 +2,6 @@ Aggregation and binning support. */ -use crate::raw::Frameable; use crate::EventFull; use bytes::{BufMut, Bytes, BytesMut}; use err::Error; @@ -305,8 +304,9 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatch { } } -impl Frameable for MinMaxAvgScalarEventBatch { - fn serialized(&self) -> Bytes { +impl MinMaxAvgScalarEventBatch { + #[allow(dead_code)] + fn old_serialized(&self) -> Bytes { let n1 = self.tss.len(); let mut g = BytesMut::with_capacity(4 + n1 * (8 + 3 * 4)); g.put_u32_le(n1 as u32); @@ -523,8 +523,9 @@ impl MinMaxAvgScalarBinBatch { } } -impl Frameable for MinMaxAvgScalarBinBatch { - fn serialized(&self) -> Bytes { +impl MinMaxAvgScalarBinBatch { + #[allow(dead_code)] + fn old_serialized(&self) -> Bytes { let n1 = self.ts1s.len(); let mut g = BytesMut::with_capacity(4 + n1 * (3 * 8 + 3 * 4)); g.put_u32_le(n1 as u32); @@ -936,7 +937,6 @@ where type Item = Result<::OutputValue, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - trace!("IntoBinnedTDefaultStream poll_next"); use Poll::*; if self.completed { panic!("MergedFromRemotes ✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ poll_next on completed"); @@ -950,12 +950,11 @@ where trace!("IntoBinnedTDefaultStream curbin out of spec, END"); Ready(None) } else if let Some(k) = self.left.take() { - trace!("IntoBinnedTDefaultStream GIVE LEFT"); + trace!("IntoBinnedTDefaultStream USE LEFTOVER"); k } else if self.inp_completed { Ready(None) } else { - trace!("IntoBinnedTDefaultStream POLL OUR INPUT"); let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll"); inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx)) }; diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 005392d..4b056e8 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -9,7 +9,7 @@ use futures_util::{pin_mut, FutureExt, StreamExt, TryStreamExt}; use hyper::Response; use netpod::{ AggKind, BinSpecDimT, Channel, Cluster, NanoRange, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator, - PreBinnedPatchRange, ToNanos, + PreBinnedPatchRange, RetStreamExt, ToNanos, }; use serde::{Deserialize, Serialize}; use std::future::{ready, Future}; @@ -145,7 +145,7 @@ impl Stream for BinnedBytesForHttpStream { } match self.inp.poll_next_unpin(cx) { Ready(Some(item)) => { - match bincode::serialize(&item) { + match bincode::serialize::(&item) { Ok(enc) => { // TODO optimize this... const HEAD: usize = super::raw::INMEM_FRAME_HEAD; @@ -699,7 +699,6 @@ impl BinnedStream { agg_kind: AggKind, node_config: Arc, ) -> Self { - use netpod::RetStreamExt; warn!("BinnedStream will open a PreBinnedValueStream"); let inp = futures_util::stream::iter(patch_it) .map(move |coord| { diff --git a/disk/src/raw.rs b/disk/src/raw.rs index a2c7e9c..0e2dcf2 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -43,7 +43,7 @@ pub async fn x_processed_stream_from_node( // TODO this incorrect magic MUST bubble up into the final result and be reported. - netout.write_u32_le(INMEM_FRAME_MAGIC - 1).await?; + netout.write_u32_le(INMEM_FRAME_MAGIC).await?; netout.write_u32_le(qjs.len() as u32).await?; netout.write_u32_le(0).await?; netout.write_all(&qjs).await?; @@ -52,12 +52,9 @@ pub async fn x_processed_stream_from_node( netout.write_u32_le(0).await?; netout.flush().await?; netout.forget(); - debug!("x_processed_stream_from_node WRITTEN"); let frames = InMemoryFrameAsyncReadStream::new(netin); let s2 = MinMaxAvgScalarEventBatchStreamFromFrames::new(frames); - debug!("x_processed_stream_from_node HAVE STREAM INSTANCE"); let s3: Pin> + Send>> = Box::pin(s2); - debug!("x_processed_stream_from_node RETURN"); Ok(s3) } @@ -94,13 +91,19 @@ where "MinMaxAvgScalarEventBatchStreamFromFrames got full frame buf {}", buf.len() ); - //let item = MinMaxAvgScalarEventBatch::from_full_frame(&buf); match bincode::deserialize::(buf.as_ref()) { Ok(item) => match item { Ok(item) => Ready(Some(Ok(item))), Err(e) => Ready(Some(Err(e))), }, - Err(e) => Ready(Some(Err(e.into()))), + Err(e) => { + trace!( + "MinMaxAvgScalarEventBatchStreamFromFrames ~~~~~~~~ ERROR on frame payload {} {}", + buf.len(), + crchex(buf) + ); + Ready(Some(Err(e.into()))) + } } } Ready(Some(Err(e))) => Ready(Some(Err(e))), @@ -303,13 +306,6 @@ async fn local_unpacked_test() { let _stream = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node).into_dim_1_f32_stream(); } -/** -Can be serialized as a length-delimited frame. -*/ -pub trait Frameable { - fn serialized(&self) -> Bytes; -} - pub async fn raw_service(node_config: Arc) -> Result<(), Error> { let addr = format!("{}:{}", node_config.node.listen, node_config.node.port_raw); let lis = tokio::net::TcpListener::bind(addr).await?; @@ -335,14 +331,18 @@ async fn raw_conn_handler_inner(stream: TcpStream, addr: SocketAddr) -> Result<( match raw_conn_handler_inner_try(stream, addr).await { Ok(_) => (), Err(mut ce) => { - let ret: RawConnOut = Err(ce.err); - let enc = bincode::serialize(&ret)?; + error!("raw_conn_handler_inner CAUGHT ERROR AND TRY TO SEND OVER TCP"); + let enc = bincode::serialize::(&Err(ce.err))?; // TODO optimize - let mut buf = BytesMut::with_capacity(enc.len() + 32); + let mut buf = BytesMut::with_capacity(enc.len() + INMEM_FRAME_HEAD); buf.put_u32_le(INMEM_FRAME_MAGIC); buf.put_u32_le(enc.len() as u32); buf.put_u32_le(0); buf.put(enc.as_ref()); + trace!( + "raw_conn_handler_inner ~~~~~~~~~~~~ EMIT FRAME PAYLOAD CRC {}", + crchex(&enc) + ); match ce.netout.write(&buf).await { Ok(_) => (), Err(e) => return Err(e)?, @@ -357,14 +357,8 @@ struct ConnErr { netout: OwnedWriteHalf, } -impl From<(Error, OwnedWriteHalf)> for ConnErr { - fn from((err, netout): (Error, OwnedWriteHalf)) -> Self { - Self { err, netout } - } -} - -impl From<(std::io::Error, OwnedWriteHalf)> for ConnErr { - fn from((err, netout): (std::io::Error, OwnedWriteHalf)) -> Self { +impl> From<(E, OwnedWriteHalf)> for ConnErr { + fn from((err, netout): (E, OwnedWriteHalf)) -> Self { Self { err: err.into(), netout, @@ -406,17 +400,28 @@ async fn raw_conn_handler_inner_try(stream: TcpStream, addr: SocketAddr) -> Resu batch.maxs.push(8.4); batch.avgs.push(9.5); batch.avgs.push(9.6); - let mut s1 = futures_util::stream::iter(vec![batch]); + let mut s1 = futures_util::stream::iter(vec![batch]).map(Result::Ok); while let Some(item) = s1.next().await { - let fr = item.serialized(); - let mut buf = BytesMut::with_capacity(fr.len() + 32); - buf.put_u32_le(INMEM_FRAME_MAGIC); - buf.put_u32_le(fr.len() as u32); - buf.put_u32_le(0); - buf.put(fr.as_ref()); - match netout.write(&buf).await { - Ok(_) => {} - Err(e) => return Err((e, netout))?, + match bincode::serialize::(&item) { + Ok(enc) => { + let mut buf = BytesMut::with_capacity(enc.len() + INMEM_FRAME_HEAD); + buf.put_u32_le(INMEM_FRAME_MAGIC); + buf.put_u32_le(enc.len() as u32); + buf.put_u32_le(0); + buf.put(enc.as_ref()); + trace!( + "raw_conn_handler_inner_try ~~~~~~~~~~~~ EMIT FRAME PAYLOAD {} {}", + enc.len(), + crchex(enc) + ); + match netout.write(&buf).await { + Ok(_) => {} + Err(e) => return Err((e, netout))?, + } + } + Err(e) => { + return Err((e, netout))?; + } } } let mut buf = BytesMut::with_capacity(32); @@ -433,3 +438,13 @@ async fn raw_conn_handler_inner_try(stream: TcpStream, addr: SocketAddr) -> Resu } Ok(()) } + +pub fn crchex(t: T) -> String +where + T: AsRef<[u8]>, +{ + let mut h = crc32fast::Hasher::new(); + h.update(t.as_ref()); + let crc = h.finalize(); + format!("{:08x}", crc) +} diff --git a/err/src/lib.rs b/err/src/lib.rs index e93bc91..feca841 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -3,7 +3,7 @@ Error handling and reporting. */ use nom::error::ErrorKind; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Serialize, Serializer}; use std::fmt::Debug; use std::net::AddrParseError; use std::num::ParseIntError; @@ -18,55 +18,69 @@ pub struct Error { msg: String, #[serde(skip)] trace: Option, + trace_str: Option, +} + +#[allow(dead_code)] +fn ser_trace(_: &Option, _: S) -> Result +where + S: Serializer, +{ + todoval() } impl Error { pub fn with_msg>(s: S) -> Self { Self { msg: s.into(), - trace: Some(backtrace::Backtrace::new()), + trace: None, + trace_str: Some(fmt_backtrace(&backtrace::Backtrace::new())), } } } -impl std::fmt::Debug for Error { - fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - use std::io::Write; - let mut buf = vec![]; - match &self.trace { - Some(trace) => { - for fr in trace.frames() { - for sy in fr.symbols() { - let is_ours = match sy.filename() { - None => false, - Some(s) => s.to_str().unwrap().contains("dev/daqbuffer"), - }; - let name = match sy.name() { - Some(k) => k.to_string(), - _ => "[err]".into(), - }; - let filename = match sy.filename() { - Some(k) => match k.to_str() { - Some(k) => k, - _ => "[err]", - }, - _ => "[err]", - }; - let lineno = match sy.lineno() { - Some(k) => k, - _ => 0, - }; - if is_ours { - write!(&mut buf, "\n {}\n {} {}", name, filename, lineno).unwrap(); - } - } - } - } - None => { - write!(&mut buf, "NO_TRACE").unwrap(); +fn fmt_backtrace(trace: &backtrace::Backtrace) -> String { + use std::io::Write; + let mut buf = vec![]; + for fr in trace.frames() { + for sy in fr.symbols() { + let is_ours = match sy.filename() { + None => false, + Some(s) => s.to_str().unwrap().contains("dev/daqbuffer"), + }; + let name = match sy.name() { + Some(k) => k.to_string(), + _ => "[err]".into(), + }; + let filename = match sy.filename() { + Some(k) => match k.to_str() { + Some(k) => k, + _ => "[err]", + }, + _ => "[err]", + }; + let lineno = match sy.lineno() { + Some(k) => k, + _ => 0, + }; + if is_ours { + write!(&mut buf, "\n {}\n {} {}", name, filename, lineno).unwrap(); } } - write!(fmt, "Error {}\nTrace:{}", self.msg, String::from_utf8(buf).unwrap()) + } + String::from_utf8(buf).unwrap() +} + +impl std::fmt::Debug for Error { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + let trace_str = if let Some(trace) = &self.trace { + fmt_backtrace(trace) + } else if let Some(s) = &self.trace_str { + s.into() + } else { + "NOTRACE".into() + }; + write!(fmt, "Error {}\nTrace:\n{}", self.msg, trace_str) } }