From bd9c231310f9bdb57b7874eadbd8a22ee5229364 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 26 Apr 2021 19:44:41 +0200 Subject: [PATCH] Add range limit to rawp tcp event service --- disk/src/aggtest.rs | 11 +++++++---- disk/src/eventblobs.rs | 17 ++++++++++++----- disk/src/eventchunker.rs | 31 +++++++++++++++++++++++++------ disk/src/lib.rs | 3 ++- disk/src/raw/conn.rs | 30 ++++++++++++++++++++++-------- 5 files changed, 68 insertions(+), 24 deletions(-) diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 982085a..5d5cc65 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -3,7 +3,7 @@ use super::merge::MergeDim1F32Stream; use crate::agg::make_test_node; use futures_util::StreamExt; use netpod::timeunits::*; -use netpod::{BinSpecDimT, Channel, ChannelConfig, ScalarType, Shape}; +use netpod::{BinSpecDimT, Channel, ChannelConfig, NanoRange, ScalarType, Shape}; use std::future::ready; use std::sync::Arc; #[allow(unused_imports)] @@ -42,7 +42,8 @@ async fn agg_x_dim_0_inner() { let bin_count = 20; let ts1 = query.timebin as u64 * query.channel_config.time_bin_size; let ts2 = ts1 + HOUR * 24; - let fut1 = super::eventblobs::EventBlobsComplete::new(&query, query.channel_config.clone(), node) + let range = NanoRange { beg: ts1, end: ts2 }; + let fut1 = super::eventblobs::EventBlobsComplete::new(&query, query.channel_config.clone(), range, node) .into_dim_1_f32_stream() //.take(1000) .map(|q| { @@ -107,7 +108,8 @@ async fn agg_x_dim_1_inner() { let bin_count = 10; let ts1 = query.timebin as u64 * query.channel_config.time_bin_size; let ts2 = ts1 + HOUR * 24; - let fut1 = super::eventblobs::EventBlobsComplete::new(&query, query.channel_config.clone(), node) + let range = NanoRange { beg: ts1, end: ts2 }; + let fut1 = super::eventblobs::EventBlobsComplete::new(&query, query.channel_config.clone(), range, node) .into_dim_1_f32_stream() //.take(1000) .map(|q| { @@ -160,12 +162,13 @@ async fn merge_0_inner() { tb_file_count: 1, buffer_size: 1024 * 8, }; + let range: NanoRange = err::todoval(); let streams = (0..13) .into_iter() .map(|k| make_test_node(k)) .map(|node| { let node = Arc::new(node); - super::eventblobs::EventBlobsComplete::new(&query, query.channel_config.clone(), node) + super::eventblobs::EventBlobsComplete::new(&query, query.channel_config.clone(), range.clone(), node) .into_dim_1_f32_stream() }) .collect(); diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 403dc3d..8300269 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -3,7 +3,7 @@ use crate::{file_content_stream, open_files}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; -use netpod::{ChannelConfig, Node}; +use netpod::{ChannelConfig, NanoRange, Node}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -14,15 +14,22 @@ pub struct EventBlobsComplete { file_chan: async_channel::Receiver>, evs: Option, buffer_size: u32, + range: NanoRange, } impl EventBlobsComplete { - pub fn new(query: &netpod::AggQuerySingleChannel, channel_config: ChannelConfig, node: Arc) -> Self { + pub fn new( + query: &netpod::AggQuerySingleChannel, + channel_config: ChannelConfig, + range: NanoRange, + node: Arc, + ) -> Self { Self { file_chan: open_files(query, node), evs: None, buffer_size: query.buffer_size, channel_config, + range, } } } @@ -46,8 +53,8 @@ impl Stream for EventBlobsComplete { Ready(Some(k)) => match k { Ok(file) => { let inp = Box::pin(file_content_stream(file, self.buffer_size as usize)); - let chunker = EventChunker::new(inp, self.channel_config.clone()); - self.evs.replace(chunker); + let chunker = EventChunker::new(inp, self.channel_config.clone(), self.range.clone()); + self.evs = Some(chunker); continue 'outer; } Err(e) => Ready(Some(Err(e))), @@ -73,7 +80,7 @@ pub fn event_blobs_complete( match fileres { Ok(file) => { let inp = Box::pin(file_content_stream(file, query.buffer_size as usize)); - let mut chunker = EventChunker::new(inp, err::todoval()); + let mut chunker = EventChunker::new(inp, err::todoval(), err::todoval()); while let Some(evres) = chunker.next().await { match evres { Ok(evres) => { diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index c7faa30..1700297 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -6,7 +6,8 @@ use futures_core::Stream; use futures_util::pin_mut; #[allow(unused_imports)] use netpod::log::*; -use netpod::{ChannelConfig, ScalarType, Shape}; +use netpod::timeunits::SEC; +use netpod::{ChannelConfig, NanoRange, ScalarType, Shape}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -18,6 +19,8 @@ pub struct EventChunker { channel_config: ChannelConfig, errored: bool, completed: bool, + range: NanoRange, + seen_beyond_range: bool, } enum DataFileState { @@ -29,6 +32,7 @@ impl EventChunker { pub fn new( inp: Pin> + Send>>, channel_config: ChannelConfig, + range: NanoRange, ) -> Self { let mut inp = NeedMinBuffer::new(inp); inp.set_need_min(6); @@ -40,6 +44,8 @@ impl EventChunker { channel_config, errored: false, completed: false, + range, + seen_beyond_range: false, } } @@ -48,10 +54,6 @@ impl EventChunker { } fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result { - // must communicate to caller: - // what I've found in the buffer - // what I've consumed from the buffer - // how many bytes I need min to make progress let mut ret = EventFull::empty(); use byteorder::{ReadBytesExt, BE}; loop { @@ -115,6 +117,10 @@ impl EventChunker { sl.read_i64::().unwrap(); let ts = sl.read_i64::().unwrap() as u64; let pulse = sl.read_i64::().unwrap() as u64; + if ts >= self.range.end { + self.seen_beyond_range = true; + break; + } sl.read_i64::().unwrap(); let status = sl.read_i8().unwrap(); let severity = sl.read_i8().unwrap(); @@ -187,7 +193,16 @@ impl EventChunker { Ok(c1) => { assert!(c1 as u32 == k1); trace!("decompress result c1 {} k1 {}", c1, k1); - ret.add_event(ts, pulse, Some(decomp), ScalarType::from_dtype_index(type_index)); + if ts < self.range.beg { + warn!("UNNECESSARY EVENT DECOMPRESS {}", ts / SEC); + } else { + ret.add_event( + ts, + pulse, + Some(decomp), + ScalarType::from_dtype_index(type_index), + ); + } } Err(e) => { Err(Error::with_msg(format!("decompression failed {:?}", e)))?; @@ -246,6 +261,10 @@ impl Stream for EventChunker { self.completed = true; return Ready(None); } + if self.seen_beyond_range { + self.completed = true; + return Ready(None); + } self.polled += 1; if self.polled >= 20000 { warn!("EventChunker poll limit reached"); diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 8093881..e1b3395 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -353,7 +353,8 @@ pub fn parsed1( match fileres { Ok(file) => { let inp = Box::pin(file_content_stream(file, query.buffer_size as usize)); - let mut chunker = eventchunker::EventChunker::new(inp, err::todoval()); + let range = err::todoval(); + let mut chunker = eventchunker::EventChunker::new(inp, err::todoval(), range); while let Some(evres) = chunker.next().await { match evres { Ok(evres) => { diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 196d4a2..aeab352 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -7,7 +7,7 @@ use err::Error; use futures_util::StreamExt; #[allow(unused_imports)] use netpod::log::*; -use netpod::timeunits::DAY; +use netpod::timeunits::{DAY, SEC}; use netpod::{NodeConfig, ScalarType, Shape}; use std::net::SocketAddr; use std::sync::Arc; @@ -123,6 +123,11 @@ async fn raw_conn_handler_inner_try( return Err((Error::with_msg("can not parse request json"), netout))?; } }; + debug!( + "\n\nREQUEST FOR RANGE {} {}\n\n", + evq.range.beg / SEC, + evq.range.end / SEC + ); error!( "TODO decide on response content based on the parsed json query\n{:?}", evq @@ -147,17 +152,26 @@ async fn raw_conn_handler_inner_try( // TODO use the requested buffer size buffer_size: 1024 * 4, }; - let mut s1 = EventBlobsComplete::new(&query, query.channel_config.clone(), node_config.node.clone()) - .into_dim_1_f32_stream() - .take(10) - .into_binned_x_bins_1(); + let mut s1 = EventBlobsComplete::new( + &query, + query.channel_config.clone(), + evq.range.clone(), + node_config.node.clone(), + ) + .into_dim_1_f32_stream() + .take(10) + .into_binned_x_bins_1(); + let mut e = 0; while let Some(item) = s1.next().await { if let Ok(k) = &item { + e += 1; trace!( - "emit items {} {:?} {:?}", + "emit items sp {:2} e {:3} len {:3} {:10?} {:10?}", + node_config.node.split, + e, k.tss.len(), - k.tss.first().map(|k| k / 1000000000), - k.tss.last().map(|k| k / 1000000000) + k.tss.first().map(|k| k / SEC), + k.tss.last().map(|k| k / SEC), ); } match make_frame::(&item) {