Add range limit to rawp tcp event service
This commit is contained in:
@@ -3,7 +3,7 @@ use super::merge::MergeDim1F32Stream;
|
||||
use crate::agg::make_test_node;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::timeunits::*;
|
||||
use netpod::{BinSpecDimT, Channel, ChannelConfig, ScalarType, Shape};
|
||||
use netpod::{BinSpecDimT, Channel, ChannelConfig, NanoRange, ScalarType, Shape};
|
||||
use std::future::ready;
|
||||
use std::sync::Arc;
|
||||
#[allow(unused_imports)]
|
||||
@@ -42,7 +42,8 @@ async fn agg_x_dim_0_inner() {
|
||||
let bin_count = 20;
|
||||
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size;
|
||||
let ts2 = ts1 + HOUR * 24;
|
||||
let fut1 = super::eventblobs::EventBlobsComplete::new(&query, query.channel_config.clone(), node)
|
||||
let range = NanoRange { beg: ts1, end: ts2 };
|
||||
let fut1 = super::eventblobs::EventBlobsComplete::new(&query, query.channel_config.clone(), range, node)
|
||||
.into_dim_1_f32_stream()
|
||||
//.take(1000)
|
||||
.map(|q| {
|
||||
@@ -107,7 +108,8 @@ async fn agg_x_dim_1_inner() {
|
||||
let bin_count = 10;
|
||||
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size;
|
||||
let ts2 = ts1 + HOUR * 24;
|
||||
let fut1 = super::eventblobs::EventBlobsComplete::new(&query, query.channel_config.clone(), node)
|
||||
let range = NanoRange { beg: ts1, end: ts2 };
|
||||
let fut1 = super::eventblobs::EventBlobsComplete::new(&query, query.channel_config.clone(), range, node)
|
||||
.into_dim_1_f32_stream()
|
||||
//.take(1000)
|
||||
.map(|q| {
|
||||
@@ -160,12 +162,13 @@ async fn merge_0_inner() {
|
||||
tb_file_count: 1,
|
||||
buffer_size: 1024 * 8,
|
||||
};
|
||||
let range: NanoRange = err::todoval();
|
||||
let streams = (0..13)
|
||||
.into_iter()
|
||||
.map(|k| make_test_node(k))
|
||||
.map(|node| {
|
||||
let node = Arc::new(node);
|
||||
super::eventblobs::EventBlobsComplete::new(&query, query.channel_config.clone(), node)
|
||||
super::eventblobs::EventBlobsComplete::new(&query, query.channel_config.clone(), range.clone(), node)
|
||||
.into_dim_1_f32_stream()
|
||||
})
|
||||
.collect();
|
||||
|
||||
@@ -3,7 +3,7 @@ use crate::{file_content_stream, open_files};
|
||||
use err::Error;
|
||||
use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::{ChannelConfig, Node};
|
||||
use netpod::{ChannelConfig, NanoRange, Node};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
@@ -14,15 +14,22 @@ pub struct EventBlobsComplete {
|
||||
file_chan: async_channel::Receiver<Result<File, Error>>,
|
||||
evs: Option<EventChunker>,
|
||||
buffer_size: u32,
|
||||
range: NanoRange,
|
||||
}
|
||||
|
||||
impl EventBlobsComplete {
|
||||
pub fn new(query: &netpod::AggQuerySingleChannel, channel_config: ChannelConfig, node: Arc<Node>) -> Self {
|
||||
pub fn new(
|
||||
query: &netpod::AggQuerySingleChannel,
|
||||
channel_config: ChannelConfig,
|
||||
range: NanoRange,
|
||||
node: Arc<Node>,
|
||||
) -> Self {
|
||||
Self {
|
||||
file_chan: open_files(query, node),
|
||||
evs: None,
|
||||
buffer_size: query.buffer_size,
|
||||
channel_config,
|
||||
range,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -46,8 +53,8 @@ impl Stream for EventBlobsComplete {
|
||||
Ready(Some(k)) => match k {
|
||||
Ok(file) => {
|
||||
let inp = Box::pin(file_content_stream(file, self.buffer_size as usize));
|
||||
let chunker = EventChunker::new(inp, self.channel_config.clone());
|
||||
self.evs.replace(chunker);
|
||||
let chunker = EventChunker::new(inp, self.channel_config.clone(), self.range.clone());
|
||||
self.evs = Some(chunker);
|
||||
continue 'outer;
|
||||
}
|
||||
Err(e) => Ready(Some(Err(e))),
|
||||
@@ -73,7 +80,7 @@ pub fn event_blobs_complete(
|
||||
match fileres {
|
||||
Ok(file) => {
|
||||
let inp = Box::pin(file_content_stream(file, query.buffer_size as usize));
|
||||
let mut chunker = EventChunker::new(inp, err::todoval());
|
||||
let mut chunker = EventChunker::new(inp, err::todoval(), err::todoval());
|
||||
while let Some(evres) = chunker.next().await {
|
||||
match evres {
|
||||
Ok(evres) => {
|
||||
|
||||
@@ -6,7 +6,8 @@ use futures_core::Stream;
|
||||
use futures_util::pin_mut;
|
||||
#[allow(unused_imports)]
|
||||
use netpod::log::*;
|
||||
use netpod::{ChannelConfig, ScalarType, Shape};
|
||||
use netpod::timeunits::SEC;
|
||||
use netpod::{ChannelConfig, NanoRange, ScalarType, Shape};
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
@@ -18,6 +19,8 @@ pub struct EventChunker {
|
||||
channel_config: ChannelConfig,
|
||||
errored: bool,
|
||||
completed: bool,
|
||||
range: NanoRange,
|
||||
seen_beyond_range: bool,
|
||||
}
|
||||
|
||||
enum DataFileState {
|
||||
@@ -29,6 +32,7 @@ impl EventChunker {
|
||||
pub fn new(
|
||||
inp: Pin<Box<dyn Stream<Item = Result<BytesMut, Error>> + Send>>,
|
||||
channel_config: ChannelConfig,
|
||||
range: NanoRange,
|
||||
) -> Self {
|
||||
let mut inp = NeedMinBuffer::new(inp);
|
||||
inp.set_need_min(6);
|
||||
@@ -40,6 +44,8 @@ impl EventChunker {
|
||||
channel_config,
|
||||
errored: false,
|
||||
completed: false,
|
||||
range,
|
||||
seen_beyond_range: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,10 +54,6 @@ impl EventChunker {
|
||||
}
|
||||
|
||||
fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result<ParseResult, Error> {
|
||||
// must communicate to caller:
|
||||
// what I've found in the buffer
|
||||
// what I've consumed from the buffer
|
||||
// how many bytes I need min to make progress
|
||||
let mut ret = EventFull::empty();
|
||||
use byteorder::{ReadBytesExt, BE};
|
||||
loop {
|
||||
@@ -115,6 +117,10 @@ impl EventChunker {
|
||||
sl.read_i64::<BE>().unwrap();
|
||||
let ts = sl.read_i64::<BE>().unwrap() as u64;
|
||||
let pulse = sl.read_i64::<BE>().unwrap() as u64;
|
||||
if ts >= self.range.end {
|
||||
self.seen_beyond_range = true;
|
||||
break;
|
||||
}
|
||||
sl.read_i64::<BE>().unwrap();
|
||||
let status = sl.read_i8().unwrap();
|
||||
let severity = sl.read_i8().unwrap();
|
||||
@@ -187,7 +193,16 @@ impl EventChunker {
|
||||
Ok(c1) => {
|
||||
assert!(c1 as u32 == k1);
|
||||
trace!("decompress result c1 {} k1 {}", c1, k1);
|
||||
ret.add_event(ts, pulse, Some(decomp), ScalarType::from_dtype_index(type_index));
|
||||
if ts < self.range.beg {
|
||||
warn!("UNNECESSARY EVENT DECOMPRESS {}", ts / SEC);
|
||||
} else {
|
||||
ret.add_event(
|
||||
ts,
|
||||
pulse,
|
||||
Some(decomp),
|
||||
ScalarType::from_dtype_index(type_index),
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
Err(Error::with_msg(format!("decompression failed {:?}", e)))?;
|
||||
@@ -246,6 +261,10 @@ impl Stream for EventChunker {
|
||||
self.completed = true;
|
||||
return Ready(None);
|
||||
}
|
||||
if self.seen_beyond_range {
|
||||
self.completed = true;
|
||||
return Ready(None);
|
||||
}
|
||||
self.polled += 1;
|
||||
if self.polled >= 20000 {
|
||||
warn!("EventChunker poll limit reached");
|
||||
|
||||
@@ -353,7 +353,8 @@ pub fn parsed1(
|
||||
match fileres {
|
||||
Ok(file) => {
|
||||
let inp = Box::pin(file_content_stream(file, query.buffer_size as usize));
|
||||
let mut chunker = eventchunker::EventChunker::new(inp, err::todoval());
|
||||
let range = err::todoval();
|
||||
let mut chunker = eventchunker::EventChunker::new(inp, err::todoval(), range);
|
||||
while let Some(evres) = chunker.next().await {
|
||||
match evres {
|
||||
Ok(evres) => {
|
||||
|
||||
@@ -7,7 +7,7 @@ use err::Error;
|
||||
use futures_util::StreamExt;
|
||||
#[allow(unused_imports)]
|
||||
use netpod::log::*;
|
||||
use netpod::timeunits::DAY;
|
||||
use netpod::timeunits::{DAY, SEC};
|
||||
use netpod::{NodeConfig, ScalarType, Shape};
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
@@ -123,6 +123,11 @@ async fn raw_conn_handler_inner_try(
|
||||
return Err((Error::with_msg("can not parse request json"), netout))?;
|
||||
}
|
||||
};
|
||||
debug!(
|
||||
"\n\nREQUEST FOR RANGE {} {}\n\n",
|
||||
evq.range.beg / SEC,
|
||||
evq.range.end / SEC
|
||||
);
|
||||
error!(
|
||||
"TODO decide on response content based on the parsed json query\n{:?}",
|
||||
evq
|
||||
@@ -147,17 +152,26 @@ async fn raw_conn_handler_inner_try(
|
||||
// TODO use the requested buffer size
|
||||
buffer_size: 1024 * 4,
|
||||
};
|
||||
let mut s1 = EventBlobsComplete::new(&query, query.channel_config.clone(), node_config.node.clone())
|
||||
.into_dim_1_f32_stream()
|
||||
.take(10)
|
||||
.into_binned_x_bins_1();
|
||||
let mut s1 = EventBlobsComplete::new(
|
||||
&query,
|
||||
query.channel_config.clone(),
|
||||
evq.range.clone(),
|
||||
node_config.node.clone(),
|
||||
)
|
||||
.into_dim_1_f32_stream()
|
||||
.take(10)
|
||||
.into_binned_x_bins_1();
|
||||
let mut e = 0;
|
||||
while let Some(item) = s1.next().await {
|
||||
if let Ok(k) = &item {
|
||||
e += 1;
|
||||
trace!(
|
||||
"emit items {} {:?} {:?}",
|
||||
"emit items sp {:2} e {:3} len {:3} {:10?} {:10?}",
|
||||
node_config.node.split,
|
||||
e,
|
||||
k.tss.len(),
|
||||
k.tss.first().map(|k| k / 1000000000),
|
||||
k.tss.last().map(|k| k / 1000000000)
|
||||
k.tss.first().map(|k| k / SEC),
|
||||
k.tss.last().map(|k| k / SEC),
|
||||
);
|
||||
}
|
||||
match make_frame::<RawConnOut>(&item) {
|
||||
|
||||
Reference in New Issue
Block a user