This commit is contained in:
Dominik Werder
2021-04-24 17:14:46 +02:00
parent 18d5dba789
commit a2047d292c
3 changed files with 101 additions and 34 deletions
+1
View File
@@ -1011,6 +1011,7 @@ where
} }
} }
} }
pub fn make_test_node(id: u32) -> Node { pub fn make_test_node(id: u32) -> Node {
Node { Node {
id, id,
+29 -15
View File
@@ -14,7 +14,7 @@ use std::task::{Context, Poll};
use tokio::fs::{File, OpenOptions}; use tokio::fs::{File, OpenOptions};
use tokio::io::AsyncRead; use tokio::io::AsyncRead;
#[allow(unused_imports)] #[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn}; use tracing::{debug, error, info, span, trace, warn, Level};
pub mod agg; pub mod agg;
#[cfg(test)] #[cfg(test)]
@@ -490,6 +490,10 @@ impl EventChunker {
} }
fn parse_buf(&mut self, buf: &mut BytesMut) -> Result<ParseResult, Error> { fn parse_buf(&mut self, buf: &mut BytesMut) -> Result<ParseResult, Error> {
span!(Level::INFO, "EventChunker::parse_buf").in_scope(|| self.parse_buf_inner(buf))
}
fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result<ParseResult, Error> {
// must communicate to caller: // must communicate to caller:
// what I've found in the buffer // what I've found in the buffer
// what I've consumed from the buffer // what I've consumed from the buffer
@@ -497,10 +501,9 @@ impl EventChunker {
let mut ret = EventFull::empty(); let mut ret = EventFull::empty();
let mut need_min = 0 as u32; let mut need_min = 0 as u32;
use byteorder::{ReadBytesExt, BE}; use byteorder::{ReadBytesExt, BE};
//info!("parse_buf rb {}", buf.len()); error!(" ???????????????????????? Why should need_min ever be zero?");
//let mut i1 = 0; info!("parse_buf buf len {} need_min {}", buf.len(), need_min);
loop { loop {
//info!("parse_buf LOOP {}", i1);
if (buf.len() as u32) < need_min { if (buf.len() as u32) < need_min {
break; break;
} }
@@ -593,8 +596,13 @@ impl EventChunker {
let ele_count = value_bytes / type_size as u64; let ele_count = value_bytes / type_size as u64;
let ele_size = type_size; let ele_size = type_size;
match self.channel_config.shape { match self.channel_config.shape {
Shape::Wave(ele2) => { Shape::Wave(dim1count) => {
assert!(ele2 == ele_count as u32); if dim1count != ele_count as u32 {
Err(Error::with_msg(format!(
"ChannelConfig expects {:?} but event has {:?}",
self.channel_config.shape, ele_count,
)))?;
}
} }
_ => panic!(), _ => panic!(),
} }
@@ -661,6 +669,7 @@ impl Stream for EventChunker {
type Item = Result<EventFull, Error>; type Item = Result<EventFull, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use Poll::*;
self.polled += 1; self.polled += 1;
if self.polled >= 2000000 { if self.polled >= 2000000 {
warn!("EventChunker poll limit reached"); warn!("EventChunker poll limit reached");
@@ -669,9 +678,10 @@ impl Stream for EventChunker {
let g = &mut self.inp; let g = &mut self.inp;
pin_mut!(g); pin_mut!(g);
match g.poll_next(cx) { match g.poll_next(cx) {
Poll::Ready(Some(Ok(mut buf))) => { Ready(Some(Ok(mut buf))) => {
//info!("EventChunker got buffer len {}", buf.len()); //info!("EventChunker got buffer len {}", buf.len());
match self.parse_buf(&mut buf) { let r = self.parse_buf(&mut buf);
match r {
Ok(res) => { Ok(res) => {
if buf.len() > 0 { if buf.len() > 0 {
// TODO gather stats about this: // TODO gather stats about this:
@@ -680,17 +690,21 @@ impl Stream for EventChunker {
} }
if res.need_min > 8000 { if res.need_min > 8000 {
warn!("spurious EventChunker asks for need_min {}", res.need_min); warn!("spurious EventChunker asks for need_min {}", res.need_min);
panic!(); Ready(Some(Err(Error::with_msg(format!(
"spurious EventChunker asks for need_min {}",
res.need_min
)))))
} else {
self.inp.set_need_min(res.need_min);
Ready(Some(Ok(res.events)))
} }
self.inp.set_need_min(res.need_min);
Poll::Ready(Some(Ok(res.events)))
} }
Err(e) => Poll::Ready(Some(Err(e.into()))), Err(e) => Ready(Some(Err(e.into()))),
} }
} }
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), Ready(Some(Err(e))) => Ready(Some(Err(e))),
Poll::Ready(None) => Poll::Ready(None), Ready(None) => Ready(None),
Poll::Pending => Poll::Pending, Pending => Pending,
} }
} }
} }
+71 -19
View File
@@ -5,13 +5,14 @@ Delivers event data (not yet time-binned) from local storage and provides client
to request such data from nodes. to request such data from nodes.
*/ */
use crate::agg::MinMaxAvgScalarEventBatch; use crate::agg::{IntoBinnedXBins1, IntoDim1F32Stream, MinMaxAvgScalarEventBatch};
use crate::cache::BinnedBytesForHttpStreamFrame; use crate::cache::BinnedBytesForHttpStreamFrame;
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use err::Error; use err::Error;
use futures_core::Stream; use futures_core::Stream;
use futures_util::{pin_mut, StreamExt}; use futures_util::{pin_mut, StreamExt};
use netpod::{AggKind, Channel, NanoRange, Node, NodeConfig}; use netpod::timeunits::DAY;
use netpod::{AggKind, Channel, NanoRange, Node, NodeConfig, ScalarType, Shape};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
@@ -377,23 +378,29 @@ pub async fn raw_service(node_config: Arc<NodeConfig>) -> Result<(), Error> {
loop { loop {
match lis.accept().await { match lis.accept().await {
Ok((stream, addr)) => { Ok((stream, addr)) => {
taskrun::spawn(raw_conn_handler(stream, addr)); taskrun::spawn(raw_conn_handler(stream, addr, node_config.clone()));
} }
Err(e) => Err(e)?, Err(e) => Err(e)?,
} }
} }
} }
async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr) -> Result<(), Error> { async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr, node_config: Arc<NodeConfig>) -> Result<(), Error> {
//use tracing_futures::Instrument; //use tracing_futures::Instrument;
let span1 = span!(Level::INFO, "raw::raw_conn_handler"); let span1 = span!(Level::INFO, "raw::raw_conn_handler");
raw_conn_handler_inner(stream, addr).instrument(span1).await raw_conn_handler_inner(stream, addr, node_config)
.instrument(span1)
.await
} }
type RawConnOut = Result<MinMaxAvgScalarEventBatch, Error>; type RawConnOut = Result<MinMaxAvgScalarEventBatch, Error>;
async fn raw_conn_handler_inner(stream: TcpStream, addr: SocketAddr) -> Result<(), Error> { async fn raw_conn_handler_inner(
match raw_conn_handler_inner_try(stream, addr).await { stream: TcpStream,
addr: SocketAddr,
node_config: Arc<NodeConfig>,
) -> Result<(), Error> {
match raw_conn_handler_inner_try(stream, addr, node_config).await {
Ok(_) => (), Ok(_) => (),
Err(mut ce) => { Err(mut ce) => {
error!("raw_conn_handler_inner CAUGHT ERROR AND TRY TO SEND OVER TCP"); error!("raw_conn_handler_inner CAUGHT ERROR AND TRY TO SEND OVER TCP");
@@ -421,7 +428,11 @@ impl<E: Into<Error>> From<(E, OwnedWriteHalf)> for ConnErr {
} }
} }
async fn raw_conn_handler_inner_try(stream: TcpStream, addr: SocketAddr) -> Result<(), ConnErr> { async fn raw_conn_handler_inner_try(
stream: TcpStream,
addr: SocketAddr,
node_config: Arc<NodeConfig>,
) -> Result<(), ConnErr> {
info!("raw_conn_handler SPAWNED for {:?}", addr); info!("raw_conn_handler SPAWNED for {:?}", addr);
let (netin, mut netout) = stream.into_split(); let (netin, mut netout) = stream.into_split();
let mut h = InMemoryFrameAsyncReadStream::new(netin); let mut h = InMemoryFrameAsyncReadStream::new(netin);
@@ -450,7 +461,7 @@ async fn raw_conn_handler_inner_try(stream: TcpStream, addr: SocketAddr) -> Resu
Err(e) => return Err((e, netout))?, Err(e) => return Err((e, netout))?,
}; };
trace!("json: {}", qitem.0); trace!("json: {}", qitem.0);
let res: Result<serde_json::Value, _> = serde_json::from_str(&qitem.0); let res: Result<EventsQuery, _> = serde_json::from_str(&qitem.0);
let evq = match res { let evq = match res {
Ok(k) => k, Ok(k) => k,
Err(e) => { Err(e) => {
@@ -462,17 +473,34 @@ async fn raw_conn_handler_inner_try(stream: TcpStream, addr: SocketAddr) -> Resu
"TODO decide on response content based on the parsed json query\n{:?}", "TODO decide on response content based on the parsed json query\n{:?}",
evq evq
); );
let mut batch = MinMaxAvgScalarEventBatch::empty(); let query = netpod::AggQuerySingleChannel {
batch.tss.push(42); channel_config: netpod::ChannelConfig {
batch.tss.push(43); channel: netpod::Channel {
batch.mins.push(7.1); backend: "test1".into(),
batch.mins.push(7.2); name: "wave1".into(),
batch.maxs.push(8.3); },
batch.maxs.push(8.4); keyspace: 3,
batch.avgs.push(9.5); time_bin_size: DAY,
batch.avgs.push(9.6); shape: Shape::Wave(1024),
let mut s1 = futures_util::stream::iter(vec![batch]).map(Result::Ok); scalar_type: ScalarType::F64,
big_endian: true,
array: true,
compression: true,
},
// TODO use a NanoRange and search for matching files
timebin: 0,
tb_file_count: 1,
// TODO use the requested buffer size
buffer_size: 1024 * 4,
};
let mut s1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node_config.node.clone())
.into_dim_1_f32_stream()
.take(10)
.into_binned_x_bins_1();
while let Some(item) = s1.next().await { while let Some(item) = s1.next().await {
if let Ok(k) = &item {
info!("???????????????? emit item ts0: {:?}", k.tss.first());
}
match make_frame::<RawConnOut>(&item) { match make_frame::<RawConnOut>(&item) {
Ok(buf) => match netout.write(&buf).await { Ok(buf) => match netout.write(&buf).await {
Ok(_) => {} Ok(_) => {}
@@ -483,6 +511,30 @@ async fn raw_conn_handler_inner_try(stream: TcpStream, addr: SocketAddr) -> Resu
} }
} }
} }
if false {
// Manual test batch.
let mut batch = MinMaxAvgScalarEventBatch::empty();
batch.tss.push(42);
batch.tss.push(43);
batch.mins.push(7.1);
batch.mins.push(7.2);
batch.maxs.push(8.3);
batch.maxs.push(8.4);
batch.avgs.push(9.5);
batch.avgs.push(9.6);
let mut s1 = futures_util::stream::iter(vec![batch]).map(Result::Ok);
while let Some(item) = s1.next().await {
match make_frame::<RawConnOut>(&item) {
Ok(buf) => match netout.write(&buf).await {
Ok(_) => {}
Err(e) => return Err((e, netout))?,
},
Err(e) => {
return Err((e, netout))?;
}
}
}
}
let buf = make_term_frame(); let buf = make_term_frame();
match netout.write(&buf).await { match netout.write(&buf).await {
Ok(_) => (), Ok(_) => (),