diff --git a/disk/src/agg.rs b/disk/src/agg.rs index a39e957..c56a022 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -2,12 +2,15 @@ Aggregation and binning support. */ +use crate::raw::Frameable; use crate::EventFull; +use bytes::{BufMut, Bytes, BytesMut}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; use netpod::BinSpecDimT; use netpod::{Node, ScalarType}; +use std::mem::size_of; use std::pin::Pin; use std::task::{Context, Poll}; #[allow(unused_imports)] @@ -237,6 +240,24 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatch { } } +impl Frameable for MinMaxAvgScalarEventBatch { + fn serialized(&self) -> Bytes { + assert!(self.tss.len() != 0); + let n1 = self.tss.len(); + let mut g = BytesMut::with_capacity(4 + n1 * (8 + 3 * 4)); + g.put_u32_le(n1 as u32); + let a = unsafe { std::slice::from_raw_parts(&self.tss[0] as *const u64 as *const u8, size_of::() * n1) }; + g.put(a); + let a = unsafe { std::slice::from_raw_parts(&self.mins[0] as *const f32 as *const u8, size_of::() * n1) }; + g.put(a); + let a = unsafe { std::slice::from_raw_parts(&self.maxs[0] as *const f32 as *const u8, size_of::() * n1) }; + g.put(a); + let a = unsafe { std::slice::from_raw_parts(&self.avgs[0] as *const f32 as *const u8, size_of::() * n1) }; + g.put(a); + g.freeze() + } +} + pub struct MinMaxAvgScalarEventBatchAggregator { ts1: u64, ts2: u64, diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 2f69c94..731f5b9 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,4 +1,4 @@ -use crate::agg::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinSingle, MinMaxAvgScalarEventBatch}; +use crate::agg::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarEventBatch}; use crate::merge::MergedMinMaxAvgScalarStream; use crate::raw::EventsQuery; use bytes::{BufMut, Bytes, BytesMut}; @@ -287,7 +287,8 @@ impl PreBinnedValueStream { }; let evq = Arc::new(evq); let s1 = MergedFromRemotes::new(evq, self.node_config.cluster.clone()); - let s2 = s1.map_ok(|k| MinMaxAvgScalarBinBatch::empty()); + error!("try_setup_fetch_prebinned_higher_res TODO emit actual value"); + let s2 = s1.map_ok(|_k| MinMaxAvgScalarBinBatch::empty()); self.fut2 = Some(Box::pin(s2)); } } diff --git a/disk/src/raw.rs b/disk/src/raw.rs index 5c0bf62..3de4307 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -17,6 +17,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; +use tokio::net::tcp::OwnedReadHalf; use tokio::net::TcpStream; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -42,17 +43,12 @@ pub async fn x_processed_stream_from_node( debug!("x_processed_stream_from_node qjs len {}", qjs.len()); net.write_u32_le(qjs.len() as u32).await?; net.write_all(&qjs).await?; + net.write_u32_le(0).await?; net.flush().await?; - let (mut netin, mut netout) = net.into_split(); + let (netin, netout) = net.into_split(); netout.forget(); - - // TODO Can not signal some EOS over TCP. - debug!("x_processed_stream_from_node WRITTEN"); - - // TODO use the splitted streams: let s2 = MinMaxAvgScalarEventBatchStreamFromTcp { inp: netin }; - debug!("x_processed_stream_from_node HAVE STREAM INSTANCE"); let s3: Pin> + Send>> = Box::pin(s2); debug!("x_processed_stream_from_node RETURN"); @@ -60,7 +56,7 @@ pub async fn x_processed_stream_from_node( } pub struct MinMaxAvgScalarEventBatchStreamFromTcp { - inp: TcpStream, + inp: OwnedReadHalf, } impl Stream for MinMaxAvgScalarEventBatchStreamFromTcp { @@ -68,7 +64,7 @@ impl Stream for MinMaxAvgScalarEventBatchStreamFromTcp { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - 'outer: loop { + loop { // TODO make capacity configurable. // TODO reuse buffer if not full. let mut buf = BytesMut::with_capacity(1024 * 2); @@ -120,7 +116,7 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; 'outer: loop { - info!("PREPARE BUFFER FOR READING"); + info!("InMemoryFrameAsyncReadStream PREPARE BUFFER FOR READING"); let mut buf0 = std::mem::replace(&mut self.buf, BytesMut::new()); if buf0.as_mut().len() != buf0.capacity() { error!("------- {} {}", buf0.as_mut().len(), buf0.capacity()); @@ -135,6 +131,8 @@ where pin_mut!(j); break match AsyncRead::poll_read(j, cx, &mut buf2) { Ready(Ok(())) => { + let n1 = buf2.filled().len(); + info!("InMemoryFrameAsyncReadStream read Ok n1 {}", n1); let r2 = buf2.remaining(); if r2 == r1 { info!("InMemoryFrameAsyncReadStream END OF INPUT"); @@ -182,6 +180,7 @@ where } Ready(Err(e)) => Ready(Some(Err(e.into()))), Pending => { + info!("InMemoryFrameAsyncReadStream Pending"); self.buf = buf0; Pending } @@ -204,7 +203,9 @@ async fn local_unpacked_test() { /** Can be serialized as a length-delimited frame. */ -pub trait Frameable {} +pub trait Frameable { + fn serialized(&self) -> Bytes; +} pub async fn raw_service(node_config: Arc) -> Result<(), Error> { let addr = format!("{}:{}", node_config.node.listen, node_config.node.port_raw); @@ -225,10 +226,20 @@ async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr) -> Result<(), Err let mut h = InMemoryFrameAsyncReadStream::new(netin); while let Some(k) = h.next().await { warn!("raw_conn_handler FRAME RECV {}", k.is_ok()); + break; } + + warn!("TODO decide on response content based on the parsed json query"); + warn!("raw_conn_handler INPUT STREAM END"); - //netout.write_i32_le(123).await?; - //netout.flush().await?; + let mut s1 = futures_util::stream::iter(vec![MinMaxAvgScalarEventBatch::empty()]); + while let Some(item) = s1.next().await { + let fr = item.serialized(); + netout.write_u32_le(fr.len() as u32).await?; + netout.write(fr.as_ref()).await?; + } + netout.flush().await?; netout.forget(); + warn!("raw_conn_handler DONE"); Ok(()) }