This commit is contained in:
Dominik Werder
2021-04-20 18:17:13 +02:00
parent 6d82e6e8d4
commit 3e53f17acb

View File

@@ -9,7 +9,7 @@ use crate::agg::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarEventBatch};
use bytes::{Bytes, BytesMut};
use err::Error;
use futures_core::Stream;
use futures_util::pin_mut;
use futures_util::{pin_mut, StreamExt};
use netpod::{AggKind, Channel, NanoRange, Node, NodeConfig};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
@@ -89,6 +89,13 @@ pub struct InMemoryFrameAsyncReadStream<T> {
impl<T> InMemoryFrameAsyncReadStream<T> {
fn new(inp: T) -> Self {
let mut t = BytesMut::with_capacity(1024);
TODO
// how to prepare the buffer so that ReadBuf has space to write?
Self {
inp,
// TODO make start cap adjustable
@@ -162,9 +169,12 @@ pub async fn raw_service(node_config: Arc<NodeConfig>) -> Result<(), Error> {
}
async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr) -> Result<(), Error> {
info!("RAW HANDLER for {:?}", addr);
info!("raw_conn_handler SPAWNED for {:?}", addr);
let (netin, mut netout) = stream.into_split();
InMemoryFrameAsyncReadStream::new(netin);
let mut h = InMemoryFrameAsyncReadStream::new(netin);
while let Some(k) = h.next().await {
warn!("raw_conn_handler FRAME RECV {}", k.is_ok());
}
netout.write_i32_le(123).await?;
netout.flush().await?;
netout.forget();