From a2047d292c10ba902996bfb45b96464cc72301ce Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sat, 24 Apr 2021 17:14:46 +0200 Subject: [PATCH] WIP --- disk/src/agg.rs | 1 + disk/src/lib.rs | 44 +++++++++++++++--------- disk/src/raw.rs | 90 ++++++++++++++++++++++++++++++++++++++----------- 3 files changed, 101 insertions(+), 34 deletions(-) diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 0027ac6..d7cf5fa 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -1011,6 +1011,7 @@ where } } } + pub fn make_test_node(id: u32) -> Node { Node { id, diff --git a/disk/src/lib.rs b/disk/src/lib.rs index d223d28..f79d228 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -14,7 +14,7 @@ use std::task::{Context, Poll}; use tokio::fs::{File, OpenOptions}; use tokio::io::AsyncRead; #[allow(unused_imports)] -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, span, trace, warn, Level}; pub mod agg; #[cfg(test)] @@ -490,6 +490,10 @@ impl EventChunker { } fn parse_buf(&mut self, buf: &mut BytesMut) -> Result { + span!(Level::INFO, "EventChunker::parse_buf").in_scope(|| self.parse_buf_inner(buf)) + } + + fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result { // must communicate to caller: // what I've found in the buffer // what I've consumed from the buffer @@ -497,10 +501,9 @@ impl EventChunker { let mut ret = EventFull::empty(); let mut need_min = 0 as u32; use byteorder::{ReadBytesExt, BE}; - //info!("parse_buf rb {}", buf.len()); - //let mut i1 = 0; + error!(" ???????????????????????? Why should need_min ever be zero?"); + info!("parse_buf buf len {} need_min {}", buf.len(), need_min); loop { - //info!("parse_buf LOOP {}", i1); if (buf.len() as u32) < need_min { break; } @@ -593,8 +596,13 @@ impl EventChunker { let ele_count = value_bytes / type_size as u64; let ele_size = type_size; match self.channel_config.shape { - Shape::Wave(ele2) => { - assert!(ele2 == ele_count as u32); + Shape::Wave(dim1count) => { + if dim1count != ele_count as u32 { + Err(Error::with_msg(format!( + "ChannelConfig expects {:?} but event has {:?}", + self.channel_config.shape, ele_count, + )))?; + } } _ => panic!(), } @@ -661,6 +669,7 @@ impl Stream for EventChunker { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use Poll::*; self.polled += 1; if self.polled >= 2000000 { warn!("EventChunker poll limit reached"); @@ -669,9 +678,10 @@ impl Stream for EventChunker { let g = &mut self.inp; pin_mut!(g); match g.poll_next(cx) { - Poll::Ready(Some(Ok(mut buf))) => { + Ready(Some(Ok(mut buf))) => { //info!("EventChunker got buffer len {}", buf.len()); - match self.parse_buf(&mut buf) { + let r = self.parse_buf(&mut buf); + match r { Ok(res) => { if buf.len() > 0 { // TODO gather stats about this: @@ -680,17 +690,21 @@ impl Stream for EventChunker { } if res.need_min > 8000 { warn!("spurious EventChunker asks for need_min {}", res.need_min); - panic!(); + Ready(Some(Err(Error::with_msg(format!( + "spurious EventChunker asks for need_min {}", + res.need_min + ))))) + } else { + self.inp.set_need_min(res.need_min); + Ready(Some(Ok(res.events))) } - self.inp.set_need_min(res.need_min); - Poll::Ready(Some(Ok(res.events))) } - Err(e) => Poll::Ready(Some(Err(e.into()))), + Err(e) => Ready(Some(Err(e.into()))), } } - Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, + Ready(Some(Err(e))) => Ready(Some(Err(e))), + Ready(None) => Ready(None), + Pending => Pending, } } } diff --git a/disk/src/raw.rs b/disk/src/raw.rs index cdab5ba..533800e 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -5,13 +5,14 @@ Delivers event data (not yet time-binned) from local storage and provides client to request such data from nodes. */ -use crate::agg::MinMaxAvgScalarEventBatch; +use crate::agg::{IntoBinnedXBins1, IntoDim1F32Stream, MinMaxAvgScalarEventBatch}; use crate::cache::BinnedBytesForHttpStreamFrame; use bytes::{BufMut, Bytes, BytesMut}; use err::Error; use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; -use netpod::{AggKind, Channel, NanoRange, Node, NodeConfig}; +use netpod::timeunits::DAY; +use netpod::{AggKind, Channel, NanoRange, Node, NodeConfig, ScalarType, Shape}; use serde::{Deserialize, Serialize}; use std::net::SocketAddr; use std::pin::Pin; @@ -377,23 +378,29 @@ pub async fn raw_service(node_config: Arc) -> Result<(), Error> { loop { match lis.accept().await { Ok((stream, addr)) => { - taskrun::spawn(raw_conn_handler(stream, addr)); + taskrun::spawn(raw_conn_handler(stream, addr, node_config.clone())); } Err(e) => Err(e)?, } } } -async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr) -> Result<(), Error> { +async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr, node_config: Arc) -> Result<(), Error> { //use tracing_futures::Instrument; let span1 = span!(Level::INFO, "raw::raw_conn_handler"); - raw_conn_handler_inner(stream, addr).instrument(span1).await + raw_conn_handler_inner(stream, addr, node_config) + .instrument(span1) + .await } type RawConnOut = Result; -async fn raw_conn_handler_inner(stream: TcpStream, addr: SocketAddr) -> Result<(), Error> { - match raw_conn_handler_inner_try(stream, addr).await { +async fn raw_conn_handler_inner( + stream: TcpStream, + addr: SocketAddr, + node_config: Arc, +) -> Result<(), Error> { + match raw_conn_handler_inner_try(stream, addr, node_config).await { Ok(_) => (), Err(mut ce) => { error!("raw_conn_handler_inner CAUGHT ERROR AND TRY TO SEND OVER TCP"); @@ -421,7 +428,11 @@ impl> From<(E, OwnedWriteHalf)> for ConnErr { } } -async fn raw_conn_handler_inner_try(stream: TcpStream, addr: SocketAddr) -> Result<(), ConnErr> { +async fn raw_conn_handler_inner_try( + stream: TcpStream, + addr: SocketAddr, + node_config: Arc, +) -> Result<(), ConnErr> { info!("raw_conn_handler SPAWNED for {:?}", addr); let (netin, mut netout) = stream.into_split(); let mut h = InMemoryFrameAsyncReadStream::new(netin); @@ -450,7 +461,7 @@ async fn raw_conn_handler_inner_try(stream: TcpStream, addr: SocketAddr) -> Resu Err(e) => return Err((e, netout))?, }; trace!("json: {}", qitem.0); - let res: Result = serde_json::from_str(&qitem.0); + let res: Result = serde_json::from_str(&qitem.0); let evq = match res { Ok(k) => k, Err(e) => { @@ -462,17 +473,34 @@ async fn raw_conn_handler_inner_try(stream: TcpStream, addr: SocketAddr) -> Resu "TODO decide on response content based on the parsed json query\n{:?}", evq ); - let mut batch = MinMaxAvgScalarEventBatch::empty(); - batch.tss.push(42); - batch.tss.push(43); - batch.mins.push(7.1); - batch.mins.push(7.2); - batch.maxs.push(8.3); - batch.maxs.push(8.4); - batch.avgs.push(9.5); - batch.avgs.push(9.6); - let mut s1 = futures_util::stream::iter(vec![batch]).map(Result::Ok); + let query = netpod::AggQuerySingleChannel { + channel_config: netpod::ChannelConfig { + channel: netpod::Channel { + backend: "test1".into(), + name: "wave1".into(), + }, + keyspace: 3, + time_bin_size: DAY, + shape: Shape::Wave(1024), + scalar_type: ScalarType::F64, + big_endian: true, + array: true, + compression: true, + }, + // TODO use a NanoRange and search for matching files + timebin: 0, + tb_file_count: 1, + // TODO use the requested buffer size + buffer_size: 1024 * 4, + }; + let mut s1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node_config.node.clone()) + .into_dim_1_f32_stream() + .take(10) + .into_binned_x_bins_1(); while let Some(item) = s1.next().await { + if let Ok(k) = &item { + info!("???????????????? emit item ts0: {:?}", k.tss.first()); + } match make_frame::(&item) { Ok(buf) => match netout.write(&buf).await { Ok(_) => {} @@ -483,6 +511,30 @@ async fn raw_conn_handler_inner_try(stream: TcpStream, addr: SocketAddr) -> Resu } } } + if false { + // Manual test batch. + let mut batch = MinMaxAvgScalarEventBatch::empty(); + batch.tss.push(42); + batch.tss.push(43); + batch.mins.push(7.1); + batch.mins.push(7.2); + batch.maxs.push(8.3); + batch.maxs.push(8.4); + batch.avgs.push(9.5); + batch.avgs.push(9.6); + let mut s1 = futures_util::stream::iter(vec![batch]).map(Result::Ok); + while let Some(item) = s1.next().await { + match make_frame::(&item) { + Ok(buf) => match netout.write(&buf).await { + Ok(_) => {} + Err(e) => return Err((e, netout))?, + }, + Err(e) => { + return Err((e, netout))?; + } + } + } + } let buf = make_term_frame(); match netout.write(&buf).await { Ok(_) => (),