From 2dc5bfd9e80abab44aa2685162e7d85fc67d8e99 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 19 Apr 2021 19:33:23 +0200 Subject: [PATCH] WIP --- disk/src/cache.rs | 11 ++++++++++- disk/src/raw.rs | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/disk/src/cache.rs b/disk/src/cache.rs index a276b26..2f5e25b 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -274,7 +274,16 @@ impl PreBinnedValueStream { self.fut2 = Some(Box::pin(s)); } None => { - error!("TODO NO BETTER GRAN FOUND FOR g {}", g); + error!("NO BETTER GRAN FOUND FOR g {}", g); + error!("TODO see in source cache.rs"); + + // create a client helper in raw.rs which can connect to a given node with parameters + // create tcp service in raw.rs + // set up tcp inputs + // set up merger + // set up T-binning + // save to cache file if input is complete + todo!(); } } diff --git a/disk/src/raw.rs b/disk/src/raw.rs index c710e63..fa7ae4d 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -2,6 +2,42 @@ Provide ser/de of value data to a good net exchange format. */ +use crate::agg::MinMaxAvgScalarBinBatch; +use err::Error; +use futures_core::Stream; +use netpod::Node; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pub async fn x_processed_stream_from_node( + node: &Node, +) -> Result>>>, Error> { + // TODO can I factor this better? + // Need a stream of bytes, and a deserializer from stream of bytes to stream of items. + // Need to pass the parameters to upstream. + + let netin = tokio::net::TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; + + // TODO TcpStream is not yet a Stream! + + //let s2: Pin>>> = Box::pin(netin); + + err::todoval() +} + +pub struct MinMaxAvgScalarBinBatchStreamFromByteStream { + //inp: TcpStream, +} + +impl Stream for MinMaxAvgScalarBinBatchStreamFromByteStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + err::todoval() + } +} + #[allow(dead_code)] async fn local_unpacked_test() { // TODO what kind of query format? What information do I need here?