From bd87323bf14a9e167c08e85d43dc7699e91fc7b6 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 22 Apr 2021 11:56:28 +0200 Subject: [PATCH] WIP --- disk/Cargo.toml | 1 + disk/src/agg.rs | 15 +++- disk/src/cache.rs | 153 +++++++++++++++++++--------------- disk/src/merge.rs | 24 ++++-- disk/src/raw.rs | 200 ++++++++++++++++++++++++++++----------------- httpret/src/lib.rs | 6 +- 6 files changed, 251 insertions(+), 148 deletions(-) diff --git a/disk/Cargo.toml b/disk/Cargo.toml index a3f22f4..4c1eac8 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -19,6 +19,7 @@ futures-core = "0.3.14" futures-util = "0.3.14" async-stream = "0.3.0" tracing = "0.1.25" +#tracing-futures = "0.2.5" fs2 = "0.4.3" libc = "0.2.93" hex = "0.4.3" diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 3bdcdc5..5328b39 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -433,6 +433,17 @@ impl MinMaxAvgScalarBinBatch { avgs: vec![], } } + pub fn len(&self) -> usize { + self.ts1s.len() + } + pub fn push_single(&mut self, g: &MinMaxAvgScalarBinSingle) { + self.ts1s.push(g.ts1); + self.ts2s.push(g.ts2); + self.counts.push(g.count); + self.mins.push(g.min); + self.maxs.push(g.max); + self.counts.push(g.count); + } } impl std::fmt::Debug for MinMaxAvgScalarBinBatch { @@ -576,9 +587,7 @@ where // do the conversion // TODO only a scalar! - if true { - todo!(); - } + err::todoval::(); let n1 = decomp.len(); assert!(n1 % ty.bytes() as usize == 0); diff --git a/disk/src/cache.rs b/disk/src/cache.rs index fddf2f1..f169d8e 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,4 +1,4 @@ -use crate::agg::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarEventBatch}; +use crate::agg::{IntoBinnedT, MinMaxAvgScalarBinBatch, MinMaxAvgScalarEventBatch}; use crate::merge::MergedMinMaxAvgScalarStream; use crate::raw::EventsQuery; use bytes::{BufMut, Bytes, BytesMut}; @@ -7,8 +7,8 @@ use err::Error; use futures_core::Stream; use futures_util::{pin_mut, FutureExt, StreamExt, TryStreamExt}; use netpod::{ - AggKind, Channel, Cluster, NanoRange, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange, - ToNanos, + AggKind, BinSpecDimT, Channel, Cluster, NanoRange, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator, + PreBinnedPatchRange, ToNanos, }; use serde::{Deserialize, Serialize}; use std::future::Future; @@ -97,12 +97,7 @@ pub async fn binned_bytes_for_http( agg_kind, node_config.clone(), ); - // Iterate over the patches. - // Request the patch from each node. - // Merge. - // Agg+Bin. - // Deliver. - let ret = BinnedBytesForHttpStream { inp: s1 }; + let ret = BinnedBytesForHttpStream::new(s1); Ok(ret) } None => { @@ -117,7 +112,11 @@ pub struct BinnedBytesForHttpStream { inp: BinnedStream, } -impl BinnedBytesForHttpStream {} +impl BinnedBytesForHttpStream { + pub fn new(inp: BinnedStream) -> Self { + Self { inp } + } +} impl Stream for BinnedBytesForHttpStream { type Item = Result; @@ -269,7 +268,7 @@ impl PreBinnedValueStream { }) .flatten() .map(move |k| { - info!("ITEM from sub res bin_size {} {:?}", bin_size, k); + error!("NOTE NOTE NOTE try_setup_fetch_prebinned_higher_res ITEM from sub res bin_size {} {:?}", bin_size, k); k }); self.fut2 = Some(Box::pin(s)); @@ -284,12 +283,41 @@ impl PreBinnedValueStream { }, agg_kind: self.agg_kind.clone(), }; + assert!(self.patch_coord.patch_t_len() % self.patch_coord.bin_t_len() == 0); + let count = self.patch_coord.patch_t_len() / self.patch_coord.bin_t_len(); + let spec = BinSpecDimT { + bs: self.patch_coord.bin_t_len(), + ts1: self.patch_coord.patch_beg(), + ts2: self.patch_coord.patch_end(), + count, + }; let evq = Arc::new(evq); + error!("try_setup_fetch_prebinned_higher_res apply all requested transformations and T-binning"); let s1 = MergedFromRemotes::new(evq, self.node_config.cluster.clone()); - let s2 = s1.map_ok(|_k| { - error!("try_setup_fetch_prebinned_higher_res TODO emit actual value"); - MinMaxAvgScalarBinBatch::empty() - }); + let s2 = s1 + .map(|k| { + trace!("MergedFromRemotes emitted some item"); + k + }) + .into_binned_t(spec) + .map_ok({ + let mut a = MinMaxAvgScalarBinBatch::empty(); + move |k| { + error!("try_setup_fetch_prebinned_higher_res TODO emit actual value"); + a.push_single(&k); + if a.len() > 0 { + let z = std::mem::replace(&mut a, MinMaxAvgScalarBinBatch::empty()); + Some(z) + } else { + None + } + } + }) + .filter(|k| { + use std::future::ready; + ready(k.is_ok() && k.as_ref().unwrap().is_some()) + }) + .map_ok(Option::unwrap); self.fut2 = Some(Box::pin(s2)); } } @@ -304,6 +332,7 @@ impl Stream for PreBinnedValueStream { use Poll::*; 'outer: loop { break if let Some(fut) = self.fut2.as_mut() { + info!("PreBinnedValueStream --------------------------------------------------------- fut2 poll"); fut.poll_next_unpin(cx) } else if let Some(fut) = self.open_check_local_file.as_mut() { match fut.poll_unpin(cx) { @@ -350,6 +379,7 @@ impl PreBinnedValueFetchedStream { let nodeix = node_ix_for_patch(&patch_coord, &channel, &node_config.cluster); let node = &node_config.cluster.nodes[nodeix as usize]; warn!("TODO defining property of a PreBinnedPatchCoord? patchlen + ix? binsize + patchix? binsize + patchsize + patchix?"); + // TODO encapsulate uri creation, how to express aggregation kind? let uri: hyper::Uri = format!( "http://{}:{}/api/1/prebinned?{}&channel_backend={}&channel_name={}&agg_kind={:?}", node.host, @@ -375,50 +405,43 @@ impl Stream for PreBinnedValueFetchedStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - // TODO when requested next, create the next http request, connect, check headers - // and as soon as ready, wrap the body in the appropriate parser and return the stream. - // The wire protocol is not yet defined. 'outer: loop { - break match self.res.as_mut() { - Some(res) => { - pin_mut!(res); - use hyper::body::HttpBody; - match res.poll_data(cx) { - Ready(Some(Ok(_k))) => { - error!("TODO PreBinnedValueFetchedStream received value, now do something"); - Pending - } - Ready(Some(Err(e))) => Ready(Some(Err(e.into()))), - Ready(None) => Ready(None), - Pending => Pending, + break if let Some(res) = self.res.as_mut() { + pin_mut!(res); + use hyper::body::HttpBody; + match res.poll_data(cx) { + Ready(Some(Ok(_))) => { + error!("TODO PreBinnedValueFetchedStream received value, now do something"); + Pending } + Ready(Some(Err(e))) => Ready(Some(Err(e.into()))), + Ready(None) => Ready(None), + Pending => Pending, } - None => match self.resfut.as_mut() { - Some(resfut) => match resfut.poll_unpin(cx) { - Ready(res) => match res { - Ok(res) => { - info!("GOT result from SUB REQUEST: {:?}", res); - self.res = Some(res); - continue 'outer; - } - Err(e) => { - error!("PreBinnedValueStream error in stream {:?}", e); - Ready(Some(Err(e.into()))) - } - }, - Pending => Pending, + } else if let Some(resfut) = self.resfut.as_mut() { + match resfut.poll_unpin(cx) { + Ready(res) => match res { + Ok(res) => { + info!("GOT result from SUB REQUEST: {:?}", res); + self.res = Some(res); + continue 'outer; + } + Err(e) => { + error!("PreBinnedValueStream error in stream {:?}", e); + Ready(Some(Err(e.into()))) + } }, - None => { - let req = hyper::Request::builder() - .method(http::Method::GET) - .uri(&self.uri) - .body(hyper::Body::empty())?; - let client = hyper::Client::new(); - info!("START REQUEST FOR {:?}", req); - self.resfut = Some(client.request(req)); - continue 'outer; - } - }, + Pending => Pending, + } + } else { + let req = hyper::Request::builder() + .method(http::Method::GET) + .uri(&self.uri) + .body(hyper::Body::empty())?; + let client = hyper::Client::new(); + info!("START REQUEST FOR {:?}", req); + self.resfut = Some(client.request(req)); + continue 'outer; }; } } @@ -456,28 +479,31 @@ impl Stream for MergedFromRemotes { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { trace!("MergedFromRemotes MAIN POLL"); use Poll::*; - // TODO this has several stages: - // First, establish async all connections. - // Then assemble the merge-and-processing-pipeline and pull from there. 'outer: loop { break if let Some(fut) = &mut self.merged { - debug!("MergedFromRemotes POLL merged"); + debug!( + "MergedFromRemotes »»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»» MergedFromRemotes POLL merged" + ); match fut.poll_next_unpin(cx) { Ready(Some(Ok(k))) => { - info!("MergedFromRemotes Ready Some Ok"); + info!("MergedFromRemotes »»»»»»»»»»»»»» Ready Some Ok"); Ready(Some(Ok(k))) } Ready(Some(Err(e))) => { - info!("MergedFromRemotes Ready Some Err"); + info!("MergedFromRemotes »»»»»»»»»»»»»» Ready Some Err"); Ready(Some(Err(e))) } Ready(None) => { - info!("MergedFromRemotes Ready None"); + info!("MergedFromRemotes »»»»»»»»»»»»»» Ready None"); Ready(None) } - Pending => Pending, + Pending => { + info!("MergedFromRemotes »»»»»»»»»»»»»» Pending"); + Pending + } } } else { + trace!("MergedFromRemotes PHASE SETUP"); let mut pend = false; let mut c1 = 0; for i1 in 0..self.tcp_establish_futs.len() { @@ -504,7 +530,6 @@ impl Stream for MergedFromRemotes { } else { if c1 == self.tcp_establish_futs.len() { debug!("MergedFromRemotes SETTING UP MERGED STREAM"); - // TODO set up the merged stream let inps = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect(); let s1 = MergedMinMaxAvgScalarStream::new(inps); self.merged = Some(Box::pin(s1)); diff --git a/disk/src/merge.rs b/disk/src/merge.rs index ba7378c..d3b0074 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -61,8 +61,10 @@ where self.current[i1] = CurVal::Val(k); } Ready(Some(Err(e))) => { - // TODO emit this error, consider this stream as done, anything more to do here? - //self.current[i1] = CurVal::Err(e); + self.current[i1] = CurVal::Err(Error::with_msg(format!( + "MergeDim1F32Stream error from upstream {:?}", + e + ))); return Ready(Some(Err(e))); } Ready(None) => { @@ -126,7 +128,6 @@ where enum CurVal { None, Finish, - #[allow(dead_code)] Err(Error), Val(ValuesDim1), } @@ -156,7 +157,7 @@ where let n = inps.len(); let current = (0..n) .into_iter() - .map(|_k| MergedMinMaxAvgScalarStreamCurVal::None) + .map(|_| MergedMinMaxAvgScalarStreamCurVal::None) .collect(); Self { inps, @@ -178,7 +179,9 @@ where use Poll::*; 'outer: loop { if self.emitted_complete { - panic!("poll on complete stream"); + break Ready(Some(Err(Error::with_msg( + "MergedMinMaxAvgScalarStream poll on complete stream", + )))); } // can only run logic if all streams are either finished, errored or have some current value. for i1 in 0..self.inps.len() { @@ -228,8 +231,15 @@ where } } if lowest_ix == usize::MAX { - // TODO all inputs in finished state - break Ready(None); + if self.batch.tss.len() != 0 { + let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); + info!("```````````````` MergedMinMaxAvgScalarStream emit Ready(Some( current batch ))"); + break Ready(Some(Ok(k))); + } else { + self.emitted_complete = true; + info!("```````````````` MergedMinMaxAvgScalarStream emit Ready(None)"); + break Ready(None); + } } else { info!("decided on next lowest ts {} ix {}", lowest_ts, lowest_ix); self.batch.tss.push(lowest_ts); diff --git a/disk/src/raw.rs b/disk/src/raw.rs index c29c239..181fef8 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -17,10 +17,10 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; -use tokio::net::tcp::OwnedReadHalf; use tokio::net::TcpStream; +use tracing::Instrument; #[allow(unused_imports)] -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, span, trace, warn, Level}; /** Query parameters to request (optionally) X-processed, but not T-processed events. @@ -37,15 +37,15 @@ pub async fn x_processed_stream_from_node( node: Arc, ) -> Result> + Send>>, Error> { debug!("x_processed_stream_from_node ENTER"); - let mut net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; + let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; debug!("x_processed_stream_from_node CONNECTED"); let qjs = serde_json::to_vec(query.as_ref())?; debug!("x_processed_stream_from_node qjs len {}", qjs.len()); - net.write_u32_le(qjs.len() as u32).await?; - net.write_all(&qjs).await?; - net.write_u32_le(0).await?; - net.flush().await?; - let (netin, netout) = net.into_split(); + let (netin, mut netout) = net.into_split(); + netout.write_u32_le(qjs.len() as u32).await?; + netout.write_all(&qjs).await?; + netout.write_u32_le(0).await?; + netout.flush().await?; netout.forget(); debug!("x_processed_stream_from_node WRITTEN"); let frames = InMemoryFrameAsyncReadStream::new(netin); @@ -112,6 +112,8 @@ where inp: T, buf: BytesMut, wp: usize, + tryparse: bool, + stopped: bool, } impl InMemoryFrameAsyncReadStream @@ -122,7 +124,57 @@ where // TODO make start cap adjustable let mut buf = BytesMut::with_capacity(1024); buf.resize(buf.capacity(), 0); - Self { inp, buf, wp: 0 } + Self { + inp, + buf, + wp: 0, + tryparse: false, + stopped: false, + } + } + + fn tryparse(&mut self) -> Option> { + info!("InMemoryFrameAsyncReadStream tryparse"); + let mut buf = std::mem::replace(&mut self.buf, BytesMut::new()); + if self.wp >= 4 { + let len = u32::from_le_bytes(*arrayref::array_ref![buf, 0, 4]); + info!("InMemoryFrameAsyncReadStream tryparse len: {}", len); + if len == 0 { + warn!("InMemoryFrameAsyncReadStream tryparse ✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ STOP FRAME"); + assert!(self.wp == 4); + self.buf = buf; + Some(None) + } else { + assert!(len > 0 && len < 1024 * 512); + let nl = len as usize + 4; + if buf.capacity() < nl { + buf.resize(nl, 0); + } else { + // nothing to do + } + if self.wp >= nl { + info!("InMemoryFrameAsyncReadStream tryparse Have whole frame"); + let mut buf3 = BytesMut::with_capacity(buf.capacity()); + // TODO make stats of copied bytes and warn if ratio is too bad. + buf3.put(buf[nl..self.wp].as_ref()); + buf3.resize(buf3.capacity(), 0); + use bytes::Buf; + buf.truncate(nl); + buf.advance(4); + self.wp = self.wp - nl; + self.buf = buf3; + Some(Some(buf.freeze())) + } else { + trace!("InMemoryFrameAsyncReadStream tryparse less than length + 4 bytes"); + self.buf = buf; + None + } + } + } else { + trace!("InMemoryFrameAsyncReadStream tryparse less than 4 bytes"); + self.buf = buf; + None + } } } @@ -133,77 +185,71 @@ where type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + info!("InMemoryFrameAsyncReadStream MAIN POLL"); use Poll::*; + assert!(!self.stopped); 'outer: loop { - info!("InMemoryFrameAsyncReadStream PREPARE BUFFER FOR READING"); - let mut buf0 = std::mem::replace(&mut self.buf, BytesMut::new()); - if buf0.as_mut().len() != buf0.capacity() { - error!("------- {} {}", buf0.as_mut().len(), buf0.capacity()); - panic!(); - } - let mut buf2 = ReadBuf::new(buf0.as_mut()[self.wp..].as_mut()); - assert!(buf2.filled().len() == 0); - assert!(buf2.capacity() > 0); - assert!(buf2.remaining() > 0); - let r1 = buf2.remaining(); - let j = &mut self.inp; - pin_mut!(j); - break match AsyncRead::poll_read(j, cx, &mut buf2) { - Ready(Ok(())) => { - let n1 = buf2.filled().len(); - info!("InMemoryFrameAsyncReadStream read Ok n1 {}", n1); - let r2 = buf2.remaining(); - if r2 == r1 { - info!("InMemoryFrameAsyncReadStream END OF INPUT"); - if self.wp != 0 { - error!("self.wp != 0 {}", self.wp); - } - assert!(self.wp == 0); + if self.tryparse { + info!("InMemoryFrameAsyncReadStream TRYPARSE"); + break match self.tryparse() { + None => { + self.tryparse = false; + continue 'outer; + } + Some(None) => { + self.tryparse = false; + self.stopped = true; Ready(None) - } else { - let n = buf2.filled().len(); - self.wp += n; - info!("InMemoryFrameAsyncReadStream read n {} wp {}", n, self.wp); - if self.wp >= 4 { - let len = u32::from_le_bytes(*arrayref::array_ref![buf0.as_mut(), 0, 4]); - info!("InMemoryFrameAsyncReadStream len: {}", len); - assert!(len > 0 && len < 1024 * 512); - let nl = len as usize + 4; - if buf0.capacity() < nl { - buf0.resize(nl, 0); - } else { - // nothing to do + } + Some(Some(k)) => Ready(Some(Ok(k))), + }; + } else { + info!("InMemoryFrameAsyncReadStream PREPARE BUFFER FOR READING"); + let mut buf0 = std::mem::replace(&mut self.buf, BytesMut::new()); + if buf0.as_mut().len() != buf0.capacity() { + error!("------- {} {}", buf0.as_mut().len(), buf0.capacity()); + panic!(); + } + let mut buf2 = ReadBuf::new(buf0.as_mut()[self.wp..].as_mut()); + assert!(buf2.filled().len() == 0); + assert!(buf2.capacity() > 0); + assert!(buf2.remaining() > 0); + let r1 = buf2.remaining(); + let j = &mut self.inp; + pin_mut!(j); + info!( + "InMemoryFrameAsyncReadStream POLL READ remaining {}", + buf2.remaining() + ); + break match AsyncRead::poll_read(j, cx, &mut buf2) { + Ready(Ok(())) => { + let n1 = buf2.filled().len(); + info!("InMemoryFrameAsyncReadStream read Ok n1 {}", n1); + let r2 = buf2.remaining(); + if r2 == r1 { + info!("InMemoryFrameAsyncReadStream END OF INPUT"); + if self.wp != 0 { + error!("InMemoryFrameAsyncReadStream self.wp != 0 {}", self.wp); } - if self.wp >= nl { - info!("InMemoryFrameAsyncReadStream Have whole frame"); - let mut buf3 = BytesMut::with_capacity(buf0.capacity()); - // TODO make stats of copied bytes and warn if ratio is too bad. - buf3.put(buf0.as_ref()[nl..self.wp].as_ref()); - buf3.resize(buf3.capacity(), 0); - self.wp = self.wp - nl; - self.buf = buf3; - use bytes::Buf; - buf0.truncate(nl); - buf0.advance(4); - Ready(Some(Ok(buf0.freeze()))) - } else { - self.buf = buf0; - continue 'outer; - } - } else { - info!("InMemoryFrameAsyncReadStream not yet enough for len"); self.buf = buf0; + Ready(None) + } else { + let n = buf2.filled().len(); + self.wp += n; + info!("InMemoryFrameAsyncReadStream read n {} wp {}", n, self.wp); + self.buf = buf0; + self.tryparse = true; continue 'outer; } } - } - Ready(Err(e)) => Ready(Some(Err(e.into()))), - Pending => { - info!("InMemoryFrameAsyncReadStream Pending"); - self.buf = buf0; - Pending - } - }; + Ready(Err(e)) => Ready(Some(Err(e.into()))), + Pending => { + info!("InMemoryFrameAsyncReadStream Pending"); + self.buf = buf0; + Pending + } + }; + } } } } @@ -240,10 +286,17 @@ pub async fn raw_service(node_config: Arc) -> Result<(), Error> { } async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr) -> Result<(), Error> { + //use tracing_futures::Instrument; + let span1 = span!(Level::INFO, "raw::raw_conn_handler"); + raw_conn_handler_inner(stream, addr).instrument(span1).await +} + +async fn raw_conn_handler_inner(stream: TcpStream, addr: SocketAddr) -> Result<(), Error> { info!("raw_conn_handler SPAWNED for {:?}", addr); let (netin, mut netout) = stream.into_split(); let mut h = InMemoryFrameAsyncReadStream::new(netin); - while let Some(k) = h.next().await { + let inp_read_span = span!(Level::INFO, "raw_conn_handler INPUT STREAM READ"); + while let Some(k) = h.next().instrument(inp_read_span).await { warn!("raw_conn_handler FRAME RECV {}", k.is_ok()); break; } @@ -266,6 +319,7 @@ async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr) -> Result<(), Err netout.write_u32_le(fr.len() as u32).await?; netout.write(fr.as_ref()).await?; } + netout.write_u32_le(0).await?; netout.flush().await?; netout.forget(); warn!("raw_conn_handler DONE"); diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index ba5edfc..b8ca83c 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -242,7 +242,11 @@ async fn prebinned(req: Request, node_config: Arc) -> Result response(StatusCode::OK).body(BodyStream::wrapped( s, - format!("prebinned-bin-{}-path-{}", q.patch.bin_t_len(), q.patch.patch_beg()), + format!( + "pre-b-{}-p-{}", + q.patch.bin_t_len() / 1000000000, + q.patch.patch_beg() / 1000000000, + ), ))?, Err(e) => { error!("{:?}", e);