diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 5328b39..fe85b67 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -14,7 +14,7 @@ use std::mem::size_of; use std::pin::Pin; use std::task::{Context, Poll}; #[allow(unused_imports)] -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, span, trace, warn, Level}; pub trait AggregatorTdim { type InputValue; @@ -218,8 +218,8 @@ impl MinMaxAvgScalarEventBatch { } pub fn from_full_frame(buf: &Bytes) -> Self { info!("construct MinMaxAvgScalarEventBatch from full frame len {}", buf.len()); + assert!(buf.len() >= 4); let mut g = MinMaxAvgScalarEventBatch::empty(); - let n1; unsafe { let ptr = (&buf[0] as *const u8) as *const [u8; 4]; @@ -442,13 +442,123 @@ impl MinMaxAvgScalarBinBatch { self.counts.push(g.count); self.mins.push(g.min); self.maxs.push(g.max); - self.counts.push(g.count); + self.avgs.push(g.avg); + } + pub fn from_full_frame(buf: &Bytes) -> Self { + info!("MinMaxAvgScalarBinBatch construct from full frame len {}", buf.len()); + assert!(buf.len() >= 4); + let mut g = MinMaxAvgScalarBinBatch::empty(); + let n1; + unsafe { + let ptr = (&buf[0] as *const u8) as *const [u8; 4]; + n1 = u32::from_le_bytes(*ptr); + trace!( + "MinMaxAvgScalarBinBatch construct --- +++ --- +++ --- +++ n1: {}", + n1 + ); + } + if n1 == 0 { + g + } else { + let n2 = n1 as usize; + g.ts1s.reserve(n2); + g.ts2s.reserve(n2); + g.counts.reserve(n2); + g.mins.reserve(n2); + g.maxs.reserve(n2); + g.avgs.reserve(n2); + unsafe { + // TODO Can I unsafely create ptrs and just assign them? + // TODO What are cases where I really need transmute? + g.ts1s.set_len(n2); + g.ts2s.set_len(n2); + g.counts.set_len(n2); + g.mins.set_len(n2); + g.maxs.set_len(n2); + g.avgs.set_len(n2); + let ptr0 = &buf[4] as *const u8; + { + let ptr1 = ptr0.add(0) as *const u64; + for i1 in 0..n2 { + g.ts1s[i1] = *ptr1.add(i1); + } + } + { + let ptr1 = ptr0.add((8) * n2) as *const u64; + for i1 in 0..n2 { + g.ts2s[i1] = *ptr1.add(i1); + } + } + { + let ptr1 = ptr0.add((8 + 8) * n2) as *const u64; + for i1 in 0..n2 { + g.counts[i1] = *ptr1.add(i1); + } + } + { + let ptr1 = ptr0.add((8 + 8 + 8) * n2) as *const f32; + for i1 in 0..n2 { + g.mins[i1] = *ptr1.add(i1); + } + } + { + let ptr1 = ptr0.add((8 + 8 + 8 + 4) * n2) as *const f32; + for i1 in 0..n2 { + g.maxs[i1] = *ptr1; + } + } + { + let ptr1 = ptr0.add((8 + 8 + 8 + 4 + 4) * n2) as *const f32; + for i1 in 0..n2 { + g.avgs[i1] = *ptr1; + } + } + } + info!("CONTENT {:?}", g); + g + } + } +} + +impl Frameable for MinMaxAvgScalarBinBatch { + fn serialized(&self) -> Bytes { + let n1 = self.ts1s.len(); + let mut g = BytesMut::with_capacity(4 + n1 * (3 * 8 + 3 * 4)); + g.put_u32_le(n1 as u32); + if n1 > 0 { + let ptr = &self.ts1s[0] as *const u64 as *const u8; + let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; + g.put(a); + let ptr = &self.ts2s[0] as *const u64 as *const u8; + let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; + g.put(a); + let ptr = &self.counts[0] as *const u64 as *const u8; + let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; + g.put(a); + let ptr = &self.mins[0] as *const f32 as *const u8; + let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; + g.put(a); + let ptr = &self.maxs[0] as *const f32 as *const u8; + let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; + g.put(a); + let ptr = &self.avgs[0] as *const f32 as *const u8; + let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; + g.put(a); + } + g.freeze() } } impl std::fmt::Debug for MinMaxAvgScalarBinBatch { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(fmt, "MinMaxAvgScalarBinBatch count {}", self.ts1s.len()) + write!( + fmt, + "MinMaxAvgScalarBinBatch count {} ts1s {:?} counts {:?} avgs {:?}", + self.ts1s.len(), + self.ts1s, + self.counts, + self.avgs + ) } } @@ -789,6 +899,9 @@ where spec: BinSpecDimT, curbin: u32, left: Option>>>, + errored: bool, + completed: bool, + inp_completed: bool, } impl IntoBinnedTDefaultStream @@ -804,6 +917,9 @@ where spec, curbin: 0, left: None, + errored: false, + completed: false, + inp_completed: false, } } } @@ -817,14 +933,27 @@ where type Item = Result<::OutputValue, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + trace!("IntoBinnedTDefaultStream poll_next"); use Poll::*; + if self.errored { + self.completed = true; + return Ready(None); + } else if self.completed { + panic!("MergedFromRemotes ✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ poll_next on completed"); + } 'outer: loop { let cur = if self.curbin as u64 >= self.spec.count { + trace!("IntoBinnedTDefaultStream curbin out of spec, END"); Ready(None) } else if let Some(k) = self.left.take() { + trace!("IntoBinnedTDefaultStream GIVE LEFT"); k + } else if self.inp_completed { + Ready(None) } else { - self.inp.poll_next_unpin(cx) + trace!("IntoBinnedTDefaultStream POLL OUR INPUT"); + let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll"); + inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx)) }; break match cur { Ready(Some(Ok(k))) => { @@ -858,14 +987,22 @@ where } } } - Ready(Some(Err(e))) => Ready(Some(Err(e))), - Ready(None) => match self.aggtor.take() { - Some(ag) => Ready(Some(Ok(ag.result()))), - None => { - warn!("TODO add trailing bins"); - Ready(None) + Ready(Some(Err(e))) => { + error!("IntoBinnedTDefaultStream err from input"); + self.errored = true; + Ready(Some(Err(e))) + } + Ready(None) => { + self.inp_completed = true; + match self.aggtor.take() { + Some(ag) => Ready(Some(Ok(ag.result()))), + None => { + warn!("TODO add the trailing empty bins until requested range is complete"); + self.completed = true; + Ready(None) + } } - }, + } Pending => Pending, }; } diff --git a/disk/src/cache.rs b/disk/src/cache.rs index f169d8e..af0b8d8 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,11 +1,12 @@ use crate::agg::{IntoBinnedT, MinMaxAvgScalarBinBatch, MinMaxAvgScalarEventBatch}; use crate::merge::MergedMinMaxAvgScalarStream; -use crate::raw::EventsQuery; +use crate::raw::{EventsQuery, Frameable, InMemoryFrameAsyncReadStream}; use bytes::{BufMut, Bytes, BytesMut}; use chrono::{DateTime, Utc}; use err::Error; use futures_core::Stream; use futures_util::{pin_mut, FutureExt, StreamExt, TryStreamExt}; +use hyper::Response; use netpod::{ AggKind, BinSpecDimT, Channel, Cluster, NanoRange, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange, ToNanos, @@ -16,6 +17,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use tiny_keccak::Hasher; +use tokio::io::{AsyncRead, ReadBuf}; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -30,8 +32,12 @@ pub struct Query { impl Query { pub fn from_request(req: &http::request::Parts) -> Result { let params = netpod::query_params(req.uri.query()); - let beg_date = params.get("beg_date").ok_or(Error::with_msg("missing beg_date"))?; - let end_date = params.get("end_date").ok_or(Error::with_msg("missing end_date"))?; + let beg_date = params + .get("beg_date") + .ok_or_else(|| Error::with_msg("missing beg_date"))?; + let end_date = params + .get("end_date") + .ok_or_else(|| Error::with_msg("missing end_date"))?; let ret = Query { range: NanoRange { beg: beg_date.parse::>()?.to_nanos(), @@ -39,7 +45,7 @@ impl Query { }, count: params .get("bin_count") - .ok_or(Error::with_msg("missing beg_date"))? + .ok_or_else(|| Error::with_msg("missing beg_date"))? .parse() .unwrap(), agg_kind: AggKind::DimXBins1, @@ -48,7 +54,6 @@ impl Query { name: params.get("channel_name").unwrap().into(), }, }; - info!("Query::from_request {:?}", ret); Ok(ret) } } @@ -102,7 +107,7 @@ pub async fn binned_bytes_for_http( } None => { // Merge raw data - error!("binned_bytes_for_http TODO merge raw data"); + error!("binned_bytes_for_http TODO merge raw data"); todo!() } } @@ -124,10 +129,17 @@ impl Stream for BinnedBytesForHttpStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; match self.inp.poll_next_unpin(cx) { - Ready(Some(Ok(_k))) => { - error!("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! BinnedBytesForHttpStream TODO serialize to bytes"); - let mut buf = BytesMut::with_capacity(250); - buf.put(&b"BinnedBytesForHttpStream TODO serialize to bytes\n"[..]); + Ready(Some(Ok(k))) => { + // TODO optimize this... + let mut buf = BytesMut::with_capacity(4); + buf.resize(4, 0); + let k = k.serialized(); + info!("BinnedBytesForHttpStream serialized slice has len {}", k.len()); + let n1 = k.len(); + buf.put_slice(&k); + assert!(buf.len() == k.len() + 4); + buf.as_mut().put_u32_le(n1 as u32); + info!("BinnedBytesForHttpStream emit buf len {}", buf.len()); Ready(Some(Ok(buf.freeze()))) } Ready(Some(Err(e))) => Ready(Some(Err(e))), @@ -179,6 +191,7 @@ pub fn pre_binned_bytes_for_http( pub struct PreBinnedValueByteStream { inp: PreBinnedValueStream, + left: Option, } impl PreBinnedValueByteStream { @@ -186,6 +199,7 @@ impl PreBinnedValueByteStream { warn!("PreBinnedValueByteStream"); Self { inp: PreBinnedValueStream::new(patch, channel, agg_kind, node_config), + left: None, } } } @@ -195,11 +209,17 @@ impl Stream for PreBinnedValueByteStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + if let Some(buf) = self.left.take() { + return Ready(Some(Ok(buf))); + } match self.inp.poll_next_unpin(cx) { - Ready(Some(Ok(_k))) => { - error!("TODO convert item to Bytes"); - let buf = Bytes::new(); - Ready(Some(Ok(buf))) + Ready(Some(Ok(k))) => { + let buf = k.serialized(); + let n1 = buf.len(); + self.left = Some(buf); + let mut buf2 = BytesMut::with_capacity(4); + buf2.put_u32_le(n1 as u32); + Ready(Some(Ok(buf2.freeze()))) } Ready(Some(Err(e))) => Ready(Some(Err(e))), Ready(None) => Ready(None), @@ -296,14 +316,17 @@ impl PreBinnedValueStream { let s1 = MergedFromRemotes::new(evq, self.node_config.cluster.clone()); let s2 = s1 .map(|k| { - trace!("MergedFromRemotes emitted some item"); + if k.is_err() { + error!(".................. try_setup_fetch_prebinned_higher_res got ERROR"); + } else { + trace!("try_setup_fetch_prebinned_higher_res got some item from MergedFromRemotes"); + } k }) .into_binned_t(spec) .map_ok({ let mut a = MinMaxAvgScalarBinBatch::empty(); move |k| { - error!("try_setup_fetch_prebinned_higher_res TODO emit actual value"); a.push_single(&k); if a.len() > 0 { let z = std::mem::replace(&mut a, MinMaxAvgScalarBinBatch::empty()); @@ -313,11 +336,15 @@ impl PreBinnedValueStream { } } }) - .filter(|k| { + .filter_map(|k| { use std::future::ready; - ready(k.is_ok() && k.as_ref().unwrap().is_some()) - }) - .map_ok(Option::unwrap); + let g = match k { + Ok(Some(k)) => Some(Ok(k)), + Ok(None) => None, + Err(e) => Some(Err(e)), + }; + ready(g) + }); self.fut2 = Some(Box::pin(s2)); } } @@ -332,7 +359,6 @@ impl Stream for PreBinnedValueStream { use Poll::*; 'outer: loop { break if let Some(fut) = self.fut2.as_mut() { - info!("PreBinnedValueStream --------------------------------------------------------- fut2 poll"); fut.poll_next_unpin(cx) } else if let Some(fut) = self.open_check_local_file.as_mut() { match fut.poll_unpin(cx) { @@ -366,7 +392,7 @@ impl Stream for PreBinnedValueStream { pub struct PreBinnedValueFetchedStream { uri: http::Uri, resfut: Option, - res: Option>, + res: Option>, } impl PreBinnedValueFetchedStream { @@ -408,11 +434,10 @@ impl Stream for PreBinnedValueFetchedStream { 'outer: loop { break if let Some(res) = self.res.as_mut() { pin_mut!(res); - use hyper::body::HttpBody; - match res.poll_data(cx) { - Ready(Some(Ok(_))) => { - error!("TODO PreBinnedValueFetchedStream received value, now do something"); - Pending + match res.poll_next(cx) { + Ready(Some(Ok(buf))) => { + let item = MinMaxAvgScalarBinBatch::from_full_frame(&buf); + Ready(Some(Ok(item))) } Ready(Some(Err(e))) => Ready(Some(Err(e.into()))), Ready(None) => Ready(None), @@ -423,7 +448,9 @@ impl Stream for PreBinnedValueFetchedStream { Ready(res) => match res { Ok(res) => { info!("GOT result from SUB REQUEST: {:?}", res); - self.res = Some(res); + let s1 = HttpBodyAsAsyncRead::new(res); + let s2 = InMemoryFrameAsyncReadStream::new(s1); + self.res = Some(s2); continue 'outer; } Err(e) => { @@ -447,12 +474,74 @@ impl Stream for PreBinnedValueFetchedStream { } } +pub struct HttpBodyAsAsyncRead { + inp: Response, + left: Bytes, + rp: usize, +} + +impl HttpBodyAsAsyncRead { + pub fn new(inp: hyper::Response) -> Self { + Self { + inp, + left: Bytes::new(), + rp: 0, + } + } +} + +impl AsyncRead for HttpBodyAsAsyncRead { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut ReadBuf) -> Poll> { + use hyper::body::HttpBody; + use Poll::*; + if self.left.len() != 0 { + let n1 = buf.remaining(); + let n2 = self.left.len() - self.rp; + if n2 <= n1 { + buf.put_slice(self.left[self.rp..].as_ref()); + self.left = Bytes::new(); + self.rp = 0; + Ready(Ok(())) + } else { + buf.put_slice(self.left[self.rp..(self.rp + n2)].as_ref()); + self.rp += n2; + Ready(Ok(())) + } + } else { + let f = &mut self.inp; + pin_mut!(f); + match f.poll_data(cx) { + Ready(Some(Ok(k))) => { + let n1 = buf.remaining(); + if k.len() <= n1 { + buf.put_slice(k.as_ref()); + Ready(Ok(())) + } else { + buf.put_slice(k[..n1].as_ref()); + self.left = k; + self.rp = n1; + Ready(Ok(())) + } + } + Ready(Some(Err(e))) => Ready(Err(std::io::Error::new( + std::io::ErrorKind::Other, + Error::with_msg(format!("Received by HttpBodyAsAsyncRead: {:?}", e)), + ))), + Ready(None) => Ready(Ok(())), + Pending => Pending, + } + } + } +} + type T001 = Pin> + Send>>; type T002 = Pin> + Send>>; pub struct MergedFromRemotes { tcp_establish_futs: Vec, nodein: Vec>, merged: Option, + completed: bool, + errored: bool, } impl MergedFromRemotes { @@ -468,6 +557,8 @@ impl MergedFromRemotes { tcp_establish_futs, nodein: (0..n).into_iter().map(|_| None).collect(), merged: None, + completed: false, + errored: false, } } } @@ -477,46 +568,44 @@ impl Stream for MergedFromRemotes { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - trace!("MergedFromRemotes MAIN POLL"); use Poll::*; + if self.errored { + warn!("MergedFromRemotes return None after Err"); + self.completed = true; + return Ready(None); + } else if self.completed { + panic!("MergedFromRemotes ✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ poll_next on completed"); + } 'outer: loop { break if let Some(fut) = &mut self.merged { - debug!( - "MergedFromRemotes »»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»» MergedFromRemotes POLL merged" - ); match fut.poll_next_unpin(cx) { - Ready(Some(Ok(k))) => { - info!("MergedFromRemotes »»»»»»»»»»»»»» Ready Some Ok"); - Ready(Some(Ok(k))) - } + Ready(Some(Ok(k))) => Ready(Some(Ok(k))), Ready(Some(Err(e))) => { - info!("MergedFromRemotes »»»»»»»»»»»»»» Ready Some Err"); + self.errored = true; Ready(Some(Err(e))) } Ready(None) => { - info!("MergedFromRemotes »»»»»»»»»»»»»» Ready None"); + self.completed = true; Ready(None) } - Pending => { - info!("MergedFromRemotes »»»»»»»»»»»»»» Pending"); - Pending - } + Pending => Pending, } } else { - trace!("MergedFromRemotes PHASE SETUP"); let mut pend = false; let mut c1 = 0; for i1 in 0..self.tcp_establish_futs.len() { if self.nodein[i1].is_none() { let f = &mut self.tcp_establish_futs[i1]; pin_mut!(f); - info!("MergedFromRemotes tcp_establish_futs POLLING INPUT ESTAB {}", i1); match f.poll(cx) { Ready(Ok(k)) => { info!("MergedFromRemotes tcp_establish_futs ESTABLISHED INPUT {}", i1); self.nodein[i1] = Some(k); } - Ready(Err(e)) => return Ready(Some(Err(e))), + Ready(Err(e)) => { + self.errored = true; + return Ready(Some(Err(e))); + } Pending => { pend = true; } diff --git a/disk/src/raw.rs b/disk/src/raw.rs index 181fef8..61a8828 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -113,14 +113,16 @@ where buf: BytesMut, wp: usize, tryparse: bool, - stopped: bool, + errored: bool, + completed: bool, + inp_bytes_consumed: u64, } impl InMemoryFrameAsyncReadStream where T: AsyncRead + Unpin, { - fn new(inp: T) -> Self { + pub fn new(inp: T) -> Self { // TODO make start cap adjustable let mut buf = BytesMut::with_capacity(1024); buf.resize(buf.capacity(), 0); @@ -129,22 +131,40 @@ where buf, wp: 0, tryparse: false, - stopped: false, + errored: false, + completed: false, + inp_bytes_consumed: 0, } } - fn tryparse(&mut self) -> Option> { - info!("InMemoryFrameAsyncReadStream tryparse"); + fn tryparse(&mut self) -> Option>> { let mut buf = std::mem::replace(&mut self.buf, BytesMut::new()); if self.wp >= 4 { let len = u32::from_le_bytes(*arrayref::array_ref![buf, 0, 4]); - info!("InMemoryFrameAsyncReadStream tryparse len: {}", len); if len == 0 { - warn!("InMemoryFrameAsyncReadStream tryparse ✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ STOP FRAME"); - assert!(self.wp == 4); + warn!("InMemoryFrameAsyncReadStream tryparse ✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ STOP FRAME self.wp {}", self.wp); + if self.wp != 4 { + return Some(Some(Err(Error::with_msg(format!( + "InMemoryFrameAsyncReadStream tryparse unexpected amount left {}", + self.wp + ))))); + } self.buf = buf; Some(None) } else { + if len > 1024 * 32 { + warn!("InMemoryFrameAsyncReadStream big len received {}", len); + } + if len > 1024 * 1024 * 2 { + error!( + "??????????????????????????? {} ????????????????????????????????????????????", + len + ); + return Some(Some(Err(Error::with_msg(format!( + "InMemoryFrameAsyncReadStream tryparse hug buffer len {} self.inp_bytes_consumed {}", + len, self.inp_bytes_consumed + ))))); + } assert!(len > 0 && len < 1024 * 512); let nl = len as usize + 4; if buf.capacity() < nl { @@ -153,7 +173,6 @@ where // nothing to do } if self.wp >= nl { - info!("InMemoryFrameAsyncReadStream tryparse Have whole frame"); let mut buf3 = BytesMut::with_capacity(buf.capacity()); // TODO make stats of copied bytes and warn if ratio is too bad. buf3.put(buf[nl..self.wp].as_ref()); @@ -163,15 +182,14 @@ where buf.advance(4); self.wp = self.wp - nl; self.buf = buf3; - Some(Some(buf.freeze())) + self.inp_bytes_consumed += buf.len() as u64 + 4; + Some(Some(Ok(buf.freeze()))) } else { - trace!("InMemoryFrameAsyncReadStream tryparse less than length + 4 bytes"); self.buf = buf; None } } } else { - trace!("InMemoryFrameAsyncReadStream tryparse less than 4 bytes"); self.buf = buf; None } @@ -185,12 +203,14 @@ where type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - info!("InMemoryFrameAsyncReadStream MAIN POLL"); use Poll::*; - assert!(!self.stopped); + if self.errored { + self.completed = true; + return Ready(None); + } + assert!(!self.completed); 'outer: loop { if self.tryparse { - info!("InMemoryFrameAsyncReadStream TRYPARSE"); break match self.tryparse() { None => { self.tryparse = false; @@ -198,13 +218,16 @@ where } Some(None) => { self.tryparse = false; - self.stopped = true; + self.completed = true; Ready(None) } - Some(Some(k)) => Ready(Some(Ok(k))), + Some(Some(Ok(k))) => Ready(Some(Ok(k))), + Some(Some(Err(e))) => { + self.errored = true; + Ready(Some(Err(e))) + } }; } else { - info!("InMemoryFrameAsyncReadStream PREPARE BUFFER FOR READING"); let mut buf0 = std::mem::replace(&mut self.buf, BytesMut::new()); if buf0.as_mut().len() != buf0.capacity() { error!("------- {} {}", buf0.as_mut().len(), buf0.capacity()); @@ -217,17 +240,11 @@ where let r1 = buf2.remaining(); let j = &mut self.inp; pin_mut!(j); - info!( - "InMemoryFrameAsyncReadStream POLL READ remaining {}", - buf2.remaining() - ); break match AsyncRead::poll_read(j, cx, &mut buf2) { Ready(Ok(())) => { - let n1 = buf2.filled().len(); - info!("InMemoryFrameAsyncReadStream read Ok n1 {}", n1); + let _n1 = buf2.filled().len(); let r2 = buf2.remaining(); if r2 == r1 { - info!("InMemoryFrameAsyncReadStream END OF INPUT"); if self.wp != 0 { error!("InMemoryFrameAsyncReadStream self.wp != 0 {}", self.wp); } @@ -236,7 +253,6 @@ where } else { let n = buf2.filled().len(); self.wp += n; - info!("InMemoryFrameAsyncReadStream read n {} wp {}", n, self.wp); self.buf = buf0; self.tryparse = true; continue 'outer; @@ -244,7 +260,6 @@ where } Ready(Err(e)) => Ready(Some(Err(e.into()))), Pending => { - info!("InMemoryFrameAsyncReadStream Pending"); self.buf = buf0; Pending } diff --git a/err/src/lib.rs b/err/src/lib.rs index fa1b1fb..2377d1f 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -57,7 +57,7 @@ impl std::fmt::Debug for Error { } } //write!(fmt, "Error {} backtrace:\n{:?}", self.msg, self.trace) - write!(fmt, "Error {} trace{}", self.msg, String::from_utf8(buf).unwrap()) + write!(fmt, "Error {}\nTrace:{}", self.msg, String::from_utf8(buf).unwrap()) } } diff --git a/retrieval/Cargo.toml b/retrieval/Cargo.toml index 34db2ce..0974388 100644 --- a/retrieval/Cargo.toml +++ b/retrieval/Cargo.toml @@ -10,6 +10,8 @@ hyper = "0.14" http = "0.2" tracing = "0.1.25" tracing-subscriber = "0.2.17" +futures-core = "0.3.14" +futures-util = "0.3.14" bytes = "1.0.1" #async-channel = "1" #dashmap = "3" @@ -23,3 +25,4 @@ err = { path = "../err" } taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } httpret = { path = "../httpret" } +disk = { path = "../disk" } diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index c3bbdf5..9ea1a50 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -1,6 +1,8 @@ use crate::spawn_test_hosts; use chrono::Utc; +use disk::agg::MinMaxAvgScalarBinBatch; use err::Error; +use futures_util::TryStreamExt; use hyper::Body; use netpod::{Cluster, Node}; use std::sync::Arc; @@ -60,9 +62,10 @@ async fn get_cached_0_inner() -> Result<(), Error> { let client = hyper::Client::new(); let res = client.request(req).await?; info!("client response {:?}", res); - let mut res_body = res.into_body(); - use hyper::body::HttpBody; - let mut ntot = 0 as u64; + //let (res_head, mut res_body) = res.into_parts(); + let s1 = disk::cache::HttpBodyAsAsyncRead::new(res); + let s2 = disk::raw::InMemoryFrameAsyncReadStream::new(s1); + /*use hyper::body::HttpBody; loop { match res_body.data().await { Some(Ok(k)) => { @@ -77,14 +80,30 @@ async fn get_cached_0_inner() -> Result<(), Error> { break; } } - } + }*/ + use futures_util::StreamExt; + use std::future::ready; + let mut bin_count = 0; + let s3 = s2 + .map_err(|e| error!("TEST GOT ERROR {:?}", e)) + .map_ok(|k| { + info!("TEST GOT ITEM: {:?}", k); + let z = MinMaxAvgScalarBinBatch::from_full_frame(&k); + info!("TEST GOT BATCH: {:?}", z); + bin_count += 1; + z + }) + .for_each(|_| ready(())); + s3.await; let t2 = chrono::Utc::now(); + let ntot = 0; let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; let throughput = ntot / 1024 * 1000 / ms; info!( - "get_cached_0 DONE total download {} MB throughput {:5} kB/s", + "get_cached_0 DONE total download {} MB throughput {:5} kB/s bin_count {}", ntot / 1024 / 1024, - throughput + throughput, + bin_count, ); drop(hosts); //Err::<(), _>(format!("test error").into()) diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index 8ae63cb..6a1a198 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -14,11 +14,25 @@ pub fn run>>(f: F) -> Result .on_thread_start(|| { let _old = panic::take_hook(); panic::set_hook(Box::new(move |info| { + let payload = if let Some(k) = info.payload().downcast_ref::() { + format!("{:?}", k) + } + else if let Some(k) = info.payload().downcast_ref::() { + k.into() + } + else if let Some(&k) = info.payload().downcast_ref::<&str>() { + k.into() + } + else { + format!("unknown payload type") + }; error!( - "✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ panicking\n{:?}\nLOCATION: {:?}\nPAYLOAD: {:?}", + "✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ panicking\n{:?}\nLOCATION: {:?}\nPAYLOAD: {:?}\ninfo object: {:?}\nerr: {:?}", Error::with_msg("catched panic in taskrun::run"), info.location(), - info.payload() + info.payload(), + info, + payload, ); //old(info); })); @@ -42,7 +56,7 @@ pub fn tracing_init() { .with_thread_names(true) //.with_max_level(tracing::Level::INFO) .with_env_filter(tracing_subscriber::EnvFilter::new( - "info,retrieval=trace,disk=trace,tokio_postgres=info", + "info,retrieval=trace,retrieval::test=trace,disk=trace,tokio_postgres=info", )) .init(); }