From 3e53f17acb3f44c96772eefd1b3ea989c2e636a4 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 20 Apr 2021 18:17:13 +0200 Subject: [PATCH] yo --- disk/src/raw.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/disk/src/raw.rs b/disk/src/raw.rs index 292f376..85e05d7 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -9,7 +9,7 @@ use crate::agg::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarEventBatch}; use bytes::{Bytes, BytesMut}; use err::Error; use futures_core::Stream; -use futures_util::pin_mut; +use futures_util::{pin_mut, StreamExt}; use netpod::{AggKind, Channel, NanoRange, Node, NodeConfig}; use serde::{Deserialize, Serialize}; use std::net::SocketAddr; @@ -89,6 +89,13 @@ pub struct InMemoryFrameAsyncReadStream { impl InMemoryFrameAsyncReadStream { fn new(inp: T) -> Self { + let mut t = BytesMut::with_capacity(1024); + + TODO + // how to prepare the buffer so that ReadBuf has space to write? + + + Self { inp, // TODO make start cap adjustable @@ -162,9 +169,12 @@ pub async fn raw_service(node_config: Arc) -> Result<(), Error> { } async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr) -> Result<(), Error> { - info!("RAW HANDLER for {:?}", addr); + info!("raw_conn_handler SPAWNED for {:?}", addr); let (netin, mut netout) = stream.into_split(); - InMemoryFrameAsyncReadStream::new(netin); + let mut h = InMemoryFrameAsyncReadStream::new(netin); + while let Some(k) = h.next().await { + warn!("raw_conn_handler FRAME RECV {}", k.is_ok()); + } netout.write_i32_le(123).await?; netout.flush().await?; netout.forget();