This commit is contained in:
Dominik Werder
2021-04-21 11:22:23 +02:00
parent f8921faf63
commit 3abf4260d1
3 changed files with 34 additions and 24 deletions

View File

@@ -1,11 +1,11 @@
use crate::agg::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarEventBatch};
use crate::agg::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinSingle, MinMaxAvgScalarEventBatch};
use crate::merge::MergedMinMaxAvgScalarStream;
use crate::raw::EventsQuery;
use bytes::{BufMut, Bytes, BytesMut};
use chrono::{DateTime, Utc};
use err::Error;
use futures_core::Stream;
use futures_util::{pin_mut, FutureExt, StreamExt};
use futures_util::{pin_mut, FutureExt, StreamExt, TryStreamExt};
use netpod::{
AggKind, Channel, Cluster, NanoRange, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange,
ToNanos,
@@ -286,10 +286,9 @@ impl PreBinnedValueStream {
agg_kind: self.agg_kind.clone(),
};
let evq = Arc::new(evq);
self.fut2 = Some(Box::pin(PreBinnedAssembledFromRemotes::new(
evq,
self.node_config.cluster.clone(),
)));
let s1 = MergedFromRemotes::new(evq, self.node_config.cluster.clone());
let s2 = s1.map_ok(|k| MinMaxAvgScalarBinBatch::empty());
self.fut2 = Some(Box::pin(s2));
}
}
}
@@ -425,13 +424,13 @@ impl Stream for PreBinnedValueFetchedStream {
type T001 = Pin<Box<dyn Stream<Item = Result<MinMaxAvgScalarEventBatch, Error>> + Send>>;
type T002 = Pin<Box<dyn Future<Output = Result<T001, Error>> + Send>>;
pub struct PreBinnedAssembledFromRemotes {
pub struct MergedFromRemotes {
tcp_establish_futs: Vec<T002>,
nodein: Vec<Option<T001>>,
merged: Option<T001>,
}
impl PreBinnedAssembledFromRemotes {
impl MergedFromRemotes {
pub fn new(evq: Arc<EventsQuery>, cluster: Arc<Cluster>) -> Self {
let mut tcp_establish_futs = vec![];
for node in &cluster.nodes {
@@ -448,23 +447,21 @@ impl PreBinnedAssembledFromRemotes {
}
}
impl Stream for PreBinnedAssembledFromRemotes {
impl Stream for MergedFromRemotes {
// TODO need this generic for scalar and array (when wave is not binned down to a single scalar point)
type Item = Result<MinMaxAvgScalarBinBatch, Error>;
type Item = Result<MinMaxAvgScalarEventBatch, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
info!("PreBinnedAssembledFromRemotes MAIN POLL");
trace!("PreBinnedAssembledFromRemotes MAIN POLL");
use Poll::*;
// TODO this has several stages:
// First, establish async all connections.
// Then assemble the merge-and-processing-pipeline and pull from there.
'outer: loop {
break if let Some(fut) = &mut self.merged {
debug!("MergedFromRemotes POLL merged");
match fut.poll_next_unpin(cx) {
Ready(Some(Ok(_k))) => {
let h = MinMaxAvgScalarBinBatch::empty();
Ready(Some(Ok(h)))
}
Ready(Some(Ok(k))) => Ready(Some(Ok(k))),
Ready(Some(Err(e))) => Ready(Some(Err(e))),
Ready(None) => Ready(None),
Pending => Pending,
@@ -498,12 +495,16 @@ impl Stream for PreBinnedAssembledFromRemotes {
debug!("SETTING UP MERGED STREAM");
// TODO set up the merged stream
let inps = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect();
let s = MergedMinMaxAvgScalarStream::new(inps);
self.merged = Some(Box::pin(s));
continue 'outer;
let s1 = MergedMinMaxAvgScalarStream::new(inps);
self.merged = Some(Box::pin(s1));
} else {
Pending
error!(
"NOTHING PENDING TODO WHAT TO DO? {} {}",
c1,
self.tcp_establish_futs.len()
);
}
continue 'outer;
}
};
}

View File

@@ -42,9 +42,17 @@ pub async fn x_processed_stream_from_node(
debug!("x_processed_stream_from_node qjs len {}", qjs.len());
net.write_u32_le(qjs.len() as u32).await?;
net.write_all(&qjs).await?;
debug!("x_processed_stream_from_node WRITTEN");
net.flush().await?;
let s2 = MinMaxAvgScalarEventBatchStreamFromTcp { inp: net };
let (mut netin, mut netout) = net.into_split();
netout.forget();
// TODO Can not signal some EOS over TCP.
debug!("x_processed_stream_from_node WRITTEN");
// TODO use the splitted streams:
let s2 = MinMaxAvgScalarEventBatchStreamFromTcp { inp: netin };
debug!("x_processed_stream_from_node HAVE STREAM INSTANCE");
let s3: Pin<Box<dyn Stream<Item = Result<_, Error>> + Send>> = Box::pin(s2);
debug!("x_processed_stream_from_node RETURN");
@@ -218,8 +226,9 @@ async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr) -> Result<(), Err
while let Some(k) = h.next().await {
warn!("raw_conn_handler FRAME RECV {}", k.is_ok());
}
netout.write_i32_le(123).await?;
netout.flush().await?;
warn!("raw_conn_handler INPUT STREAM END");
//netout.write_i32_le(123).await?;
//netout.flush().await?;
netout.forget();
Ok(())
}

View File

@@ -51,7 +51,7 @@ impl std::fmt::Debug for Error {
Some(k) => k,
_ => 0,
};
if true || is_ours {
if is_ours {
write!(&mut buf, "\n {}\n {} {}", name, filename, lineno).unwrap();
}
}