diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 09e3ba9..2f69c94 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,11 +1,11 @@ -use crate::agg::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarEventBatch}; +use crate::agg::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinSingle, MinMaxAvgScalarEventBatch}; use crate::merge::MergedMinMaxAvgScalarStream; use crate::raw::EventsQuery; use bytes::{BufMut, Bytes, BytesMut}; use chrono::{DateTime, Utc}; use err::Error; use futures_core::Stream; -use futures_util::{pin_mut, FutureExt, StreamExt}; +use futures_util::{pin_mut, FutureExt, StreamExt, TryStreamExt}; use netpod::{ AggKind, Channel, Cluster, NanoRange, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange, ToNanos, @@ -286,10 +286,9 @@ impl PreBinnedValueStream { agg_kind: self.agg_kind.clone(), }; let evq = Arc::new(evq); - self.fut2 = Some(Box::pin(PreBinnedAssembledFromRemotes::new( - evq, - self.node_config.cluster.clone(), - ))); + let s1 = MergedFromRemotes::new(evq, self.node_config.cluster.clone()); + let s2 = s1.map_ok(|k| MinMaxAvgScalarBinBatch::empty()); + self.fut2 = Some(Box::pin(s2)); } } } @@ -425,13 +424,13 @@ impl Stream for PreBinnedValueFetchedStream { type T001 = Pin> + Send>>; type T002 = Pin> + Send>>; -pub struct PreBinnedAssembledFromRemotes { +pub struct MergedFromRemotes { tcp_establish_futs: Vec, nodein: Vec>, merged: Option, } -impl PreBinnedAssembledFromRemotes { +impl MergedFromRemotes { pub fn new(evq: Arc, cluster: Arc) -> Self { let mut tcp_establish_futs = vec![]; for node in &cluster.nodes { @@ -448,23 +447,21 @@ impl PreBinnedAssembledFromRemotes { } } -impl Stream for PreBinnedAssembledFromRemotes { +impl Stream for MergedFromRemotes { // TODO need this generic for scalar and array (when wave is not binned down to a single scalar point) - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - info!("PreBinnedAssembledFromRemotes MAIN POLL"); + trace!("PreBinnedAssembledFromRemotes MAIN POLL"); use Poll::*; // TODO this has several stages: // First, establish async all connections. // Then assemble the merge-and-processing-pipeline and pull from there. 'outer: loop { break if let Some(fut) = &mut self.merged { + debug!("MergedFromRemotes POLL merged"); match fut.poll_next_unpin(cx) { - Ready(Some(Ok(_k))) => { - let h = MinMaxAvgScalarBinBatch::empty(); - Ready(Some(Ok(h))) - } + Ready(Some(Ok(k))) => Ready(Some(Ok(k))), Ready(Some(Err(e))) => Ready(Some(Err(e))), Ready(None) => Ready(None), Pending => Pending, @@ -498,12 +495,16 @@ impl Stream for PreBinnedAssembledFromRemotes { debug!("SETTING UP MERGED STREAM"); // TODO set up the merged stream let inps = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect(); - let s = MergedMinMaxAvgScalarStream::new(inps); - self.merged = Some(Box::pin(s)); - continue 'outer; + let s1 = MergedMinMaxAvgScalarStream::new(inps); + self.merged = Some(Box::pin(s1)); } else { - Pending + error!( + "NOTHING PENDING TODO WHAT TO DO? {} {}", + c1, + self.tcp_establish_futs.len() + ); } + continue 'outer; } }; } diff --git a/disk/src/raw.rs b/disk/src/raw.rs index d82d866..5c0bf62 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -42,9 +42,17 @@ pub async fn x_processed_stream_from_node( debug!("x_processed_stream_from_node qjs len {}", qjs.len()); net.write_u32_le(qjs.len() as u32).await?; net.write_all(&qjs).await?; - debug!("x_processed_stream_from_node WRITTEN"); net.flush().await?; - let s2 = MinMaxAvgScalarEventBatchStreamFromTcp { inp: net }; + let (mut netin, mut netout) = net.into_split(); + netout.forget(); + + // TODO Can not signal some EOS over TCP. + + debug!("x_processed_stream_from_node WRITTEN"); + + // TODO use the splitted streams: + let s2 = MinMaxAvgScalarEventBatchStreamFromTcp { inp: netin }; + debug!("x_processed_stream_from_node HAVE STREAM INSTANCE"); let s3: Pin> + Send>> = Box::pin(s2); debug!("x_processed_stream_from_node RETURN"); @@ -218,8 +226,9 @@ async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr) -> Result<(), Err while let Some(k) = h.next().await { warn!("raw_conn_handler FRAME RECV {}", k.is_ok()); } - netout.write_i32_le(123).await?; - netout.flush().await?; + warn!("raw_conn_handler INPUT STREAM END"); + //netout.write_i32_le(123).await?; + //netout.flush().await?; netout.forget(); Ok(()) } diff --git a/err/src/lib.rs b/err/src/lib.rs index a59a078..fa1b1fb 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -51,7 +51,7 @@ impl std::fmt::Debug for Error { Some(k) => k, _ => 0, }; - if true || is_ours { + if is_ours { write!(&mut buf, "\n {}\n {} {}", name, filename, lineno).unwrap(); } }