This commit is contained in:
Dominik Werder
2021-04-19 19:33:23 +02:00
parent d8ab42ccb9
commit 2dc5bfd9e8
2 changed files with 46 additions and 1 deletions

View File

@@ -274,7 +274,16 @@ impl PreBinnedValueStream {
self.fut2 = Some(Box::pin(s));
}
None => {
error!("TODO NO BETTER GRAN FOUND FOR g {}", g);
error!("NO BETTER GRAN FOUND FOR g {}", g);
error!("TODO see in source cache.rs");
// create a client helper in raw.rs which can connect to a given node with parameters
// create tcp service in raw.rs
// set up tcp inputs
// set up merger
// set up T-binning
// save to cache file if input is complete
todo!();
}
}

View File

@@ -2,6 +2,42 @@
Provide ser/de of value data to a good net exchange format.
*/
use crate::agg::MinMaxAvgScalarBinBatch;
use err::Error;
use futures_core::Stream;
use netpod::Node;
use std::pin::Pin;
use std::task::{Context, Poll};
pub async fn x_processed_stream_from_node(
node: &Node,
) -> Result<Pin<Box<dyn Stream<Item = Result<MinMaxAvgScalarBinBatch, Error>>>>, Error> {
// TODO can I factor this better?
// Need a stream of bytes, and a deserializer from stream of bytes to stream of items.
// Need to pass the parameters to upstream.
let netin = tokio::net::TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?;
// TODO TcpStream is not yet a Stream!
//let s2: Pin<Box<dyn Stream<Item = Result<MinMaxAvgScalarBinBatch, Error>>>> = Box::pin(netin);
err::todoval()
}
pub struct MinMaxAvgScalarBinBatchStreamFromByteStream {
//inp: TcpStream,
}
impl Stream for MinMaxAvgScalarBinBatchStreamFromByteStream {
type Item = Result<MinMaxAvgScalarBinBatch, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
err::todoval()
}
}
#[allow(dead_code)]
async fn local_unpacked_test() {
// TODO what kind of query format? What information do I need here?