This commit is contained in:
Dominik Werder
2021-04-21 16:40:17 +02:00
parent 3abf4260d1
commit 179feeb2ae
3 changed files with 48 additions and 15 deletions

View File

@@ -2,12 +2,15 @@
Aggregation and binning support.
*/
use crate::raw::Frameable;
use crate::EventFull;
use bytes::{BufMut, Bytes, BytesMut};
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use netpod::BinSpecDimT;
use netpod::{Node, ScalarType};
use std::mem::size_of;
use std::pin::Pin;
use std::task::{Context, Poll};
#[allow(unused_imports)]
@@ -237,6 +240,24 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatch {
}
}
impl Frameable for MinMaxAvgScalarEventBatch {
fn serialized(&self) -> Bytes {
assert!(self.tss.len() != 0);
let n1 = self.tss.len();
let mut g = BytesMut::with_capacity(4 + n1 * (8 + 3 * 4));
g.put_u32_le(n1 as u32);
let a = unsafe { std::slice::from_raw_parts(&self.tss[0] as *const u64 as *const u8, size_of::<u64>() * n1) };
g.put(a);
let a = unsafe { std::slice::from_raw_parts(&self.mins[0] as *const f32 as *const u8, size_of::<f32>() * n1) };
g.put(a);
let a = unsafe { std::slice::from_raw_parts(&self.maxs[0] as *const f32 as *const u8, size_of::<f32>() * n1) };
g.put(a);
let a = unsafe { std::slice::from_raw_parts(&self.avgs[0] as *const f32 as *const u8, size_of::<f32>() * n1) };
g.put(a);
g.freeze()
}
}
pub struct MinMaxAvgScalarEventBatchAggregator {
ts1: u64,
ts2: u64,

View File

@@ -1,4 +1,4 @@
use crate::agg::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinSingle, MinMaxAvgScalarEventBatch};
use crate::agg::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarEventBatch};
use crate::merge::MergedMinMaxAvgScalarStream;
use crate::raw::EventsQuery;
use bytes::{BufMut, Bytes, BytesMut};
@@ -287,7 +287,8 @@ impl PreBinnedValueStream {
};
let evq = Arc::new(evq);
let s1 = MergedFromRemotes::new(evq, self.node_config.cluster.clone());
let s2 = s1.map_ok(|k| MinMaxAvgScalarBinBatch::empty());
error!("try_setup_fetch_prebinned_higher_res TODO emit actual value");
let s2 = s1.map_ok(|_k| MinMaxAvgScalarBinBatch::empty());
self.fut2 = Some(Box::pin(s2));
}
}

View File

@@ -17,6 +17,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf};
use tokio::net::tcp::OwnedReadHalf;
use tokio::net::TcpStream;
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
@@ -42,17 +43,12 @@ pub async fn x_processed_stream_from_node(
debug!("x_processed_stream_from_node qjs len {}", qjs.len());
net.write_u32_le(qjs.len() as u32).await?;
net.write_all(&qjs).await?;
net.write_u32_le(0).await?;
net.flush().await?;
let (mut netin, mut netout) = net.into_split();
let (netin, netout) = net.into_split();
netout.forget();
// TODO Can not signal some EOS over TCP.
debug!("x_processed_stream_from_node WRITTEN");
// TODO use the splitted streams:
let s2 = MinMaxAvgScalarEventBatchStreamFromTcp { inp: netin };
debug!("x_processed_stream_from_node HAVE STREAM INSTANCE");
let s3: Pin<Box<dyn Stream<Item = Result<_, Error>> + Send>> = Box::pin(s2);
debug!("x_processed_stream_from_node RETURN");
@@ -60,7 +56,7 @@ pub async fn x_processed_stream_from_node(
}
pub struct MinMaxAvgScalarEventBatchStreamFromTcp {
inp: TcpStream,
inp: OwnedReadHalf,
}
impl Stream for MinMaxAvgScalarEventBatchStreamFromTcp {
@@ -68,7 +64,7 @@ impl Stream for MinMaxAvgScalarEventBatchStreamFromTcp {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
'outer: loop {
loop {
// TODO make capacity configurable.
// TODO reuse buffer if not full.
let mut buf = BytesMut::with_capacity(1024 * 2);
@@ -120,7 +116,7 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
'outer: loop {
info!("PREPARE BUFFER FOR READING");
info!("InMemoryFrameAsyncReadStream PREPARE BUFFER FOR READING");
let mut buf0 = std::mem::replace(&mut self.buf, BytesMut::new());
if buf0.as_mut().len() != buf0.capacity() {
error!("------- {} {}", buf0.as_mut().len(), buf0.capacity());
@@ -135,6 +131,8 @@ where
pin_mut!(j);
break match AsyncRead::poll_read(j, cx, &mut buf2) {
Ready(Ok(())) => {
let n1 = buf2.filled().len();
info!("InMemoryFrameAsyncReadStream read Ok n1 {}", n1);
let r2 = buf2.remaining();
if r2 == r1 {
info!("InMemoryFrameAsyncReadStream END OF INPUT");
@@ -182,6 +180,7 @@ where
}
Ready(Err(e)) => Ready(Some(Err(e.into()))),
Pending => {
info!("InMemoryFrameAsyncReadStream Pending");
self.buf = buf0;
Pending
}
@@ -204,7 +203,9 @@ async fn local_unpacked_test() {
/**
Can be serialized as a length-delimited frame.
*/
pub trait Frameable {}
pub trait Frameable {
fn serialized(&self) -> Bytes;
}
pub async fn raw_service(node_config: Arc<NodeConfig>) -> Result<(), Error> {
let addr = format!("{}:{}", node_config.node.listen, node_config.node.port_raw);
@@ -225,10 +226,20 @@ async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr) -> Result<(), Err
let mut h = InMemoryFrameAsyncReadStream::new(netin);
while let Some(k) = h.next().await {
warn!("raw_conn_handler FRAME RECV {}", k.is_ok());
break;
}
warn!("TODO decide on response content based on the parsed json query");
warn!("raw_conn_handler INPUT STREAM END");
//netout.write_i32_le(123).await?;
//netout.flush().await?;
let mut s1 = futures_util::stream::iter(vec![MinMaxAvgScalarEventBatch::empty()]);
while let Some(item) = s1.next().await {
let fr = item.serialized();
netout.write_u32_le(fr.len() as u32).await?;
netout.write(fr.as_ref()).await?;
}
netout.flush().await?;
netout.forget();
warn!("raw_conn_handler DONE");
Ok(())
}