This commit is contained in:
Dominik Werder
2021-04-21 10:39:16 +02:00
parent 3e53f17acb
commit f8921faf63
10 changed files with 154 additions and 55 deletions

View File

@@ -13,6 +13,7 @@ tokio = { version = "1.5.0", features = ["rt-multi-thread", "io-util", "net", "t
hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] }
async-channel = "1.6"
bytes = "1.0.1"
arrayref = "0.3.6"
byteorder = "1.4.3"
futures-core = "0.3.14"
futures-util = "0.3.14"

View File

@@ -332,6 +332,19 @@ pub struct MinMaxAvgScalarBinBatch {
avgs: Vec<f32>,
}
impl MinMaxAvgScalarBinBatch {
pub fn empty() -> Self {
Self {
ts1s: vec![],
ts2s: vec![],
counts: vec![],
mins: vec![],
maxs: vec![],
avgs: vec![],
}
}
}
impl std::fmt::Debug for MinMaxAvgScalarBinBatch {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(fmt, "MinMaxAvgScalarBinBatch count {}", self.ts1s.len())
@@ -763,6 +776,7 @@ pub fn make_test_node(id: u32) -> Node {
Node {
id,
host: "localhost".into(),
listen: "0.0.0.0".into(),
port: 8800 + id as u16,
port_raw: 8800 + id as u16 + 100,
data_base_path: format!("../tmpdata/node{:02}", id).into(),

View File

@@ -1,4 +1,5 @@
use crate::agg::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarEventBatch};
use crate::merge::MergedMinMaxAvgScalarStream;
use crate::raw::EventsQuery;
use bytes::{BufMut, Bytes, BytesMut};
use chrono::{DateTime, Utc};
@@ -382,7 +383,10 @@ impl Stream for PreBinnedValueFetchedStream {
pin_mut!(res);
use hyper::body::HttpBody;
match res.poll_data(cx) {
Ready(Some(Ok(_k))) => todo!(),
Ready(Some(Ok(_k))) => {
error!("TODO PreBinnedValueFetchedStream received value, now do something");
Pending
}
Ready(Some(Err(e))) => Ready(Some(Err(e.into()))),
Ready(None) => Ready(None),
Pending => Pending,
@@ -424,6 +428,7 @@ type T002 = Pin<Box<dyn Future<Output = Result<T001, Error>> + Send>>;
pub struct PreBinnedAssembledFromRemotes {
tcp_establish_futs: Vec<T002>,
nodein: Vec<Option<T001>>,
merged: Option<T001>,
}
impl PreBinnedAssembledFromRemotes {
@@ -438,6 +443,7 @@ impl PreBinnedAssembledFromRemotes {
Self {
tcp_establish_futs,
nodein: (0..n).into_iter().map(|_| None).collect(),
merged: None,
}
}
}
@@ -453,7 +459,17 @@ impl Stream for PreBinnedAssembledFromRemotes {
// First, establish async all connections.
// Then assemble the merge-and-processing-pipeline and pull from there.
'outer: loop {
{
break if let Some(fut) = &mut self.merged {
match fut.poll_next_unpin(cx) {
Ready(Some(Ok(_k))) => {
let h = MinMaxAvgScalarBinBatch::empty();
Ready(Some(Ok(h)))
}
Ready(Some(Err(e))) => Ready(Some(Err(e))),
Ready(None) => Ready(None),
Pending => Pending,
}
} else {
let mut pend = false;
let mut c1 = 0;
for i1 in 0..self.tcp_establish_futs.len() {
@@ -463,7 +479,7 @@ impl Stream for PreBinnedAssembledFromRemotes {
info!("tcp_establish_futs POLLING INPUT ESTAB {}", i1);
match f.poll(cx) {
Ready(Ok(k)) => {
info!("ESTABLISHED INPUT {}", i1);
info!("tcp_establish_futs ESTABLISHED INPUT {}", i1);
self.nodein[i1] = Some(k);
}
Ready(Err(e)) => return Ready(Some(Err(e))),
@@ -476,15 +492,20 @@ impl Stream for PreBinnedAssembledFromRemotes {
}
}
if pend {
break Pending;
Pending
} else {
if c1 == self.tcp_establish_futs.len() {
debug!("SETTING UP MERGED STREAM");
// TODO set up the merged stream
let inps = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect();
super::merge::MergedMinMaxAvgScalarStream::new(inps);
let s = MergedMinMaxAvgScalarStream::new(inps);
self.merged = Some(Box::pin(s));
continue 'outer;
} else {
Pending
}
}
}
};
}
}
}

View File

@@ -50,6 +50,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
let node = Node {
id: i1,
host: "localhost".into(),
listen: "0.0.0.0".into(),
port: 7780 + i1 as u16,
port_raw: 7780 + i1 as u16 + 100,
split: i1,

View File

@@ -5,8 +5,8 @@ Delivers event data (not yet time-binned) from local storage and provides client
to request such data from nodes.
*/
use crate::agg::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarEventBatch};
use bytes::{Bytes, BytesMut};
use crate::agg::MinMaxAvgScalarEventBatch;
use bytes::{BufMut, Bytes, BytesMut};
use err::Error;
use futures_core::Stream;
use futures_util::{pin_mut, StreamExt};
@@ -35,13 +35,19 @@ pub async fn x_processed_stream_from_node(
query: Arc<EventsQuery>,
node: Arc<Node>,
) -> Result<Pin<Box<dyn Stream<Item = Result<MinMaxAvgScalarEventBatch, Error>> + Send>>, Error> {
debug!("x_processed_stream_from_node ENTER");
let mut net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?;
debug!("x_processed_stream_from_node CONNECTED");
let qjs = serde_json::to_vec(query.as_ref())?;
debug!("x_processed_stream_from_node qjs len {}", qjs.len());
net.write_u32_le(qjs.len() as u32).await?;
net.write_all(&qjs).await?;
debug!("x_processed_stream_from_node WRITTEN");
net.flush().await?;
let s2 = MinMaxAvgScalarEventBatchStreamFromTcp { inp: net };
debug!("x_processed_stream_from_node HAVE STREAM INSTANCE");
let s3: Pin<Box<dyn Stream<Item = Result<_, Error>> + Send>> = Box::pin(s2);
debug!("x_processed_stream_from_node RETURN");
Ok(s3)
}
@@ -84,23 +90,16 @@ Emits each frame as a single item. Therefore, each item must fit easily into mem
*/
pub struct InMemoryFrameAsyncReadStream<T> {
inp: T,
buf: Option<BytesMut>,
buf: BytesMut,
wp: usize,
}
impl<T> InMemoryFrameAsyncReadStream<T> {
fn new(inp: T) -> Self {
let mut t = BytesMut::with_capacity(1024);
TODO
// how to prepare the buffer so that ReadBuf has space to write?
Self {
inp,
// TODO make start cap adjustable
buf: Some(BytesMut::with_capacity(1024)),
}
// TODO make start cap adjustable
let mut buf = BytesMut::with_capacity(1024);
buf.resize(buf.capacity(), 0);
Self { inp, buf, wp: 0 }
}
}
@@ -113,8 +112,14 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
'outer: loop {
let mut buf0 = self.buf.take().unwrap();
let mut buf2 = ReadBuf::new(buf0.as_mut());
info!("PREPARE BUFFER FOR READING");
let mut buf0 = std::mem::replace(&mut self.buf, BytesMut::new());
if buf0.as_mut().len() != buf0.capacity() {
error!("------- {} {}", buf0.as_mut().len(), buf0.capacity());
panic!();
}
let mut buf2 = ReadBuf::new(buf0.as_mut()[self.wp..].as_mut());
assert!(buf2.filled().len() == 0);
assert!(buf2.capacity() > 0);
assert!(buf2.remaining() > 0);
let r1 = buf2.remaining();
@@ -122,18 +127,56 @@ where
pin_mut!(j);
break match AsyncRead::poll_read(j, cx, &mut buf2) {
Ready(Ok(())) => {
if buf2.remaining() == r1 {
// TODO re-init self.buf ?
// TODO end of input.
err::todoval()
let r2 = buf2.remaining();
if r2 == r1 {
info!("InMemoryFrameAsyncReadStream END OF INPUT");
if self.wp != 0 {
error!("self.wp != 0 {}", self.wp);
}
assert!(self.wp == 0);
Ready(None)
} else {
// TODO re-init self.buf ?
// TODO how to reflect the write position in the underlying BytesMut???
err::todoval()
let n = buf2.filled().len();
self.wp += n;
info!("InMemoryFrameAsyncReadStream read n {} wp {}", n, self.wp);
if self.wp >= 4 {
let len = u32::from_le_bytes(*arrayref::array_ref![buf0.as_mut(), 0, 4]);
info!("InMemoryFrameAsyncReadStream len: {}", len);
assert!(len > 0 && len < 1024 * 512);
let nl = len as usize + 4;
if buf0.capacity() < nl {
buf0.resize(nl, 0);
} else {
// nothing to do
}
if self.wp >= nl {
info!("InMemoryFrameAsyncReadStream Have whole frame");
let mut buf3 = BytesMut::with_capacity(buf0.capacity());
// TODO make stats of copied bytes and warn if ratio is too bad.
buf3.put(buf0.as_ref()[nl..self.wp].as_ref());
buf3.resize(buf3.capacity(), 0);
self.wp = self.wp - nl;
self.buf = buf3;
use bytes::Buf;
buf0.truncate(nl);
buf0.advance(4);
Ready(Some(Ok(buf0.freeze())))
} else {
self.buf = buf0;
continue 'outer;
}
} else {
info!("InMemoryFrameAsyncReadStream not yet enough for len");
self.buf = buf0;
continue 'outer;
}
}
}
Ready(Err(e)) => Ready(Some(Err(e.into()))),
Pending => Pending,
Pending => {
self.buf = buf0;
Pending
}
};
}
}
@@ -156,7 +199,7 @@ Can be serialized as a length-delimited frame.
pub trait Frameable {}
pub async fn raw_service(node_config: Arc<NodeConfig>) -> Result<(), Error> {
let addr = format!("0.0.0.0:{}", node_config.node.port_raw);
let addr = format!("{}:{}", node_config.node.listen, node_config.node.port_raw);
let lis = tokio::net::TcpListener::bind(addr).await?;
loop {
match lis.accept().await {