From 8dc80f5dbaec645ab7132a19a9e15042d05a5e19 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 14 May 2021 10:30:54 +0200 Subject: [PATCH] up --- disk/src/agg.rs | 21 ++++++-- disk/src/agg/binnedt.rs | 6 +-- disk/src/agg/eventbatch.rs | 23 ++++++--- disk/src/agg/scalarbinbatch.rs | 16 +++++-- disk/src/dataopen.rs | 13 +++-- disk/src/eventblobs.rs | 9 ++-- disk/src/eventchunker.rs | 64 +++++++++++++++++++------ disk/src/frame/inmem.rs | 9 ++-- disk/src/lib.rs | 6 +-- httpret/src/lib.rs | 23 +++++++++ httpret/static/documentation/index.html | 11 +++++ httpret/static/documentation/page.css | 5 ++ 12 files changed, 159 insertions(+), 47 deletions(-) create mode 100644 httpret/static/documentation/index.html create mode 100644 httpret/static/documentation/page.css diff --git a/disk/src/agg.rs b/disk/src/agg.rs index db318d4..ebacf45 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -62,15 +62,22 @@ impl AggregatableXdim1Bin for ValuesDim1 { let ts = self.tss[i1]; let mut min = f32::MAX; let mut max = f32::MIN; - let mut sum = 0f32; + let mut sum = f32::NAN; + let mut count = 0; let vals = &self.values[i1]; - assert!(vals.len() > 0); for i2 in 0..vals.len() { let v = vals[i2]; - //info!("value {} {} {}", i1, i2, v); min = min.min(v); max = max.max(v); - sum += v; + if v.is_nan() { + } else { + if sum.is_nan() { + sum = v; + } else { + sum += v; + } + count += 1; + } } if min == f32::MAX { min = f32::NAN; @@ -81,7 +88,11 @@ impl AggregatableXdim1Bin for ValuesDim1 { ret.tss.push(ts); ret.mins.push(min); ret.maxs.push(max); - ret.avgs.push(sum / vals.len() as f32); + if sum.is_nan() { + ret.avgs.push(sum); + } else { + ret.avgs.push(sum / count as f32); + } } ret } diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 3fd44b1..8501528 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -96,6 +96,8 @@ where fn cur(&mut self, cx: &mut Context) -> Poll>> { if let Some(cur) = self.left.take() { cur + } else if self.inp_completed { + Poll::Ready(None) } else { let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll"); inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx)) @@ -182,7 +184,6 @@ where Some(Ready(Some(Err(e)))) } Ready(None) => { - // No more input, no more in leftover. self.inp_completed = true; if self.all_bins_emitted { None @@ -239,9 +240,6 @@ where continue 'outer; } } else { - // TODO make sure that we don't poll our input here after it has completed. - err::todo(); - let cur = self.cur(cx); match self.handle(cur) { Some(item) => item, diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index 3c9c34e..9e4bcaa 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -178,6 +178,7 @@ pub struct MinMaxAvgScalarEventBatchAggregator { min: f32, max: f32, sum: f32, + sumc: u64, } impl MinMaxAvgScalarEventBatchAggregator { @@ -185,10 +186,11 @@ impl MinMaxAvgScalarEventBatchAggregator { Self { ts1, ts2, + count: 0, min: f32::MAX, max: f32::MIN, - sum: 0f32, - count: 0, + sum: f32::NAN, + sumc: 0, } } } @@ -226,10 +228,19 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { } else if ts >= self.ts2 { continue; } else { + self.count += 1; self.min = self.min.min(v.mins[i1]); self.max = self.max.max(v.maxs[i1]); - self.sum += v.avgs[i1]; - self.count += 1; + let x = v.avgs[i1]; + if x.is_nan() { + } else { + if self.sum.is_nan() { + self.sum = x; + } else { + self.sum += x; + } + self.sumc += 1; + } } } } @@ -237,10 +248,10 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { fn result(self) -> Vec { let min = if self.min == f32::MAX { f32::NAN } else { self.min }; let max = if self.max == f32::MIN { f32::NAN } else { self.max }; - let avg = if self.count == 0 { + let avg = if self.sumc == 0 { f32::NAN } else { - self.sum / self.count as f32 + self.sum / self.sumc as f32 }; let v = MinMaxAvgScalarBinBatch { ts1s: vec![self.ts1], diff --git a/disk/src/agg/scalarbinbatch.rs b/disk/src/agg/scalarbinbatch.rs index d33bc17..ef3ac42 100644 --- a/disk/src/agg/scalarbinbatch.rs +++ b/disk/src/agg/scalarbinbatch.rs @@ -116,11 +116,13 @@ impl std::fmt::Debug for MinMaxAvgScalarBinBatch { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { write!( fmt, - "MinMaxAvgScalarBinBatch count {} ts1s {:?} ts2s {:?} counts {:?} avgs {:?}", + "MinMaxAvgScalarBinBatch count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}", self.ts1s.len(), self.ts1s.iter().map(|k| k / SEC).collect::>(), self.ts2s.iter().map(|k| k / SEC).collect::>(), self.counts, + self.mins, + self.maxs, self.avgs, ) } @@ -289,8 +291,16 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { self.count += v.counts[i1]; self.min = self.min.min(v.mins[i1]); self.max = self.max.max(v.maxs[i1]); - self.sum += v.avgs[i1]; - self.sumc += 1; + let x = v.avgs[i1]; + if x.is_nan() { + } else { + if self.sum.is_nan() { + self.sum = x; + } else { + self.sum += x; + } + self.sumc += 1; + } } } } diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index cca2639..02644f4 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -5,14 +5,20 @@ use futures_util::StreamExt; use netpod::log::*; use netpod::timeunits::MS; use netpod::{ChannelConfig, NanoRange, Nanos, Node}; +use std::path::PathBuf; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncReadExt, AsyncSeekExt, ErrorKind, SeekFrom}; +pub struct OpenedFile { + pub path: PathBuf, + pub file: File, +} + pub fn open_files( range: &NanoRange, channel_config: &ChannelConfig, node: Node, -) -> async_channel::Receiver> { +) -> async_channel::Receiver> { let (chtx, chrx) = async_channel::bounded(2); let range = range.clone(); let channel_config = channel_config.clone(); @@ -31,7 +37,7 @@ pub fn open_files( } async fn open_files_inner( - chtx: &async_channel::Sender>, + chtx: &async_channel::Sender>, range: &NanoRange, channel_config: &ChannelConfig, node: Node, @@ -121,7 +127,8 @@ async fn open_files_inner( }, } } - chtx.send(Ok(file)).await?; + let ret = OpenedFile { file, path }; + chtx.send(Ok(ret)).await?; } // TODO keep track of number of running debug!("open_files_inner done"); diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 4975e0f..82b5799 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -1,4 +1,4 @@ -use crate::dataopen::open_files; +use crate::dataopen::{open_files, OpenedFile}; use crate::eventchunker::{EventChunker, EventChunkerConf, EventChunkerItem}; use crate::file_content_stream; use err::Error; @@ -7,11 +7,10 @@ use futures_util::StreamExt; use netpod::{ChannelConfig, NanoRange, Node}; use std::pin::Pin; use std::task::{Context, Poll}; -use tokio::fs::File; pub struct EventBlobsComplete { channel_config: ChannelConfig, - file_chan: async_channel::Receiver>, + file_chan: async_channel::Receiver>, evs: Option, buffer_size: usize, event_chunker_conf: EventChunkerConf, @@ -65,12 +64,14 @@ impl Stream for EventBlobsComplete { None => match self.file_chan.poll_next_unpin(cx) { Ready(Some(k)) => match k { Ok(file) => { - let inp = Box::pin(file_content_stream(file, self.buffer_size as usize)); + let path = file.path; + let inp = Box::pin(file_content_stream(file.file, self.buffer_size as usize)); let chunker = EventChunker::from_event_boundary( inp, self.channel_config.clone(), self.range.clone(), self.event_chunker_conf.clone(), + path, ); self.evs = Some(chunker); continue 'outer; diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 2bbca4e..e73533f 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -7,6 +7,7 @@ use futures_util::StreamExt; use netpod::log::*; use netpod::timeunits::SEC; use netpod::{ByteSize, ChannelConfig, EventDataReadStats, NanoRange, ScalarType, Shape}; +use std::path::PathBuf; use std::pin::Pin; use std::task::{Context, Poll}; @@ -24,6 +25,7 @@ pub struct EventChunker { data_emit_complete: bool, final_stats_sent: bool, parsed_bytes: u64, + path: PathBuf, } enum DataFileState { @@ -53,6 +55,7 @@ impl EventChunker { channel_config: ChannelConfig, range: NanoRange, stats_conf: EventChunkerConf, + path: PathBuf, ) -> Self { let mut inp = NeedMinBuffer::new(inp); inp.set_need_min(6); @@ -70,6 +73,7 @@ impl EventChunker { data_emit_complete: false, final_stats_sent: false, parsed_bytes: 0, + path, } } @@ -78,8 +82,9 @@ impl EventChunker { channel_config: ChannelConfig, range: NanoRange, stats_conf: EventChunkerConf, + path: PathBuf, ) -> Self { - let mut ret = Self::from_start(inp, channel_config, range, stats_conf); + let mut ret = Self::from_start(inp, channel_config, range, stats_conf, path); ret.state = DataFileState::Event; ret.need_min = 4; ret.inp.set_need_min(4); @@ -100,23 +105,29 @@ impl EventChunker { } match self.state { DataFileState::FileHeader => { - assert!(buf.len() >= 6, "logic"); + if buf.len() < 6 { + Err(Error::with_msg("need min 6 for FileHeader"))?; + } let mut sl = std::io::Cursor::new(buf.as_ref()); let fver = sl.read_i16::().unwrap(); - assert!(fver == 0, "unexpected file version"); + if fver != 0 { + Err(Error::with_msg("unexpected data file version"))?; + } let len = sl.read_i32::().unwrap(); - assert!(len > 0 && len < 128, "unexpected data file header"); + if len <= 0 || len >= 128 { + Err(Error::with_msg("large channel header len"))?; + } let totlen = len as usize + 2; if buf.len() < totlen { - debug!("parse_buf not enough A totlen {}", totlen); self.need_min = totlen as u32; break; } else { sl.advance(len as usize - 8); let len2 = sl.read_i32::().unwrap(); - assert!(len == len2, "len mismatch"); - let s1 = String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec()).unwrap(); - info!("channel name {}", s1); + if len != len2 { + Err(Error::with_msg("channel header len mismatch"))?; + } + String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec())?; self.state = DataFileState::Event; self.need_min = 4; buf.advance(totlen); @@ -126,7 +137,9 @@ impl EventChunker { DataFileState::Event => { let mut sl = std::io::Cursor::new(buf.as_ref()); let len = sl.read_i32::().unwrap(); - assert!(len >= 20 && len < 1024 * 1024 * 10); + if len < 20 || len > 1024 * 1024 * 10 { + Err(Error::with_msg("unexpected large event chunk"))?; + } let len = len as u32; if (buf.len() as u32) < len { self.need_min = len as u32; @@ -144,18 +157,37 @@ impl EventChunker { break; } if ts < self.range.beg { - error!("seen before range: {}", ts / SEC); + Err(Error::with_msg(format!( + "seen before range: event ts: {}.{} range beg: {}.{} range end: {}.{} pulse {} config {:?} path {:?}", + ts / SEC, + ts % SEC, + self.range.beg / SEC, + self.range.beg % SEC, + self.range.end / SEC, + self.range.end % SEC, + pulse, + self.channel_config.shape, + self.path + )))?; } let _ioc_ts = sl.read_i64::().unwrap(); let status = sl.read_i8().unwrap(); let severity = sl.read_i8().unwrap(); let optional = sl.read_i32::().unwrap(); - assert!(status == 0); - assert!(severity == 0); - assert!(optional == -1); + if status != 0 { + Err(Error::with_msg(format!("status != 0: {}", status)))?; + } + if severity != 0 { + Err(Error::with_msg(format!("severity != 0: {}", severity)))?; + } + if optional != -1 { + Err(Error::with_msg(format!("optional != -1: {}", optional)))?; + } let type_flags = sl.read_u8().unwrap(); let type_index = sl.read_u8().unwrap(); - assert!(type_index <= 13); + if type_index > 13 { + Err(Error::with_msg(format!("type_index: {}", type_index)))?; + } let scalar_type = ScalarType::from_dtype_index(type_index)?; use super::dtflags::*; let is_compressed = type_flags & COMPRESSION != 0; @@ -163,7 +195,9 @@ impl EventChunker { let is_big_endian = type_flags & BIG_ENDIAN != 0; let is_shaped = type_flags & SHAPE != 0; if let Shape::Wave(_) = self.channel_config.shape { - assert!(is_array); + if !is_array { + Err(Error::with_msg(format!("dim1 but not array {:?}", self.channel_config)))?; + } } let compression_method = if is_compressed { sl.read_u8().unwrap() } else { 0 }; let shape_dim = if is_shaped { sl.read_u8().unwrap() } else { 0 }; diff --git a/disk/src/frame/inmem.rs b/disk/src/frame/inmem.rs index d300344..7a66c1f 100644 --- a/disk/src/frame/inmem.rs +++ b/disk/src/frame/inmem.rs @@ -67,7 +67,7 @@ where trace!("prepare read from wp {} self.buf.len() {}", self.wp, self.buf.len(),); let gg = self.buf.len() - self.wp; let mut buf2 = ReadBuf::new(&mut self.buf[self.wp..]); - if gg < 1 || gg > 1024 * 1024 * 20 { + if gg < 1 || gg > 1024 * 1024 * 40 { use bytes::Buf; panic!( "have gg {} len {} cap {} rem {} rem mut {} self.wp {}", @@ -126,9 +126,7 @@ where } (Some(None), buf, wp) } else { - if len > 1024 * 32 { - warn!("InMemoryFrameAsyncReadStream big len received {}", len); - } else if len > 1024 * 1024 * 2 { + if len > 1024 * 1024 * 50 { error!("InMemoryFrameAsyncReadStream too long len {}", len); return ( Some(Some(Err(Error::with_msg(format!( @@ -138,6 +136,9 @@ where buf, wp, ); + } else if len > 1024 * 1024 * 1 { + // TODO + //warn!("InMemoryFrameAsyncReadStream big len received {}", len); } let nl = len as usize + INMEM_FRAME_HEAD + INMEM_FRAME_FOOT; if self.bufcap < nl { diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 131c5aa..d465e20 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -284,7 +284,7 @@ fn unused_raw_concat_channel_read_stream_file_pipe( let chrx = open_files(&range, &channel_config, node); while let Ok(file) = chrx.recv().await { let mut file = match file { - Ok(k) => k, + Ok(k) => k.file, Err(_) => break }; loop { @@ -346,9 +346,9 @@ pub fn parsed1( while let Ok(fileres) = filerx.recv().await { match fileres { Ok(file) => { - let inp = Box::pin(file_content_stream(file, query.buffer_size as usize)); + let inp = Box::pin(file_content_stream(file.file, query.buffer_size as usize)); let range = err::todoval(); - let mut chunker = eventchunker::EventChunker::from_event_boundary(inp, err::todoval(), range, stats_conf.clone()); + let mut chunker = eventchunker::EventChunker::from_event_boundary(inp, err::todoval(), range, stats_conf.clone(), file.path); while let Some(evres) = chunker.next().await { use eventchunker::EventChunkerItem; match evres { diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 9cd663c..c2ea70a 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -83,6 +83,21 @@ where impl UnwindSafe for Cont {} +macro_rules! static_http { + ($path:expr, $tgt:expr, $tgtex:expr) => { + if $path == concat!("/api/4/documentation/", $tgt) { + let c = include_bytes!(concat!("../static/documentation/", $tgt, $tgtex)); + return Ok(response(StatusCode::OK).body(Body::from(&c[..]))?); + } + }; + ($path:expr, $tgt:expr) => { + if $path == concat!("/api/4/documentation/", $tgt) { + let c = include_bytes!(concat!("../static/documentation/", $tgt)); + return Ok(response(StatusCode::OK).body(Body::from(&c[..]))?); + } + }; +} + async fn data_api_proxy_try(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let uri = req.uri().clone(); let path = uri.path(); @@ -122,6 +137,14 @@ async fn data_api_proxy_try(req: Request, node_config: &NodeConfigCached) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } + } else if path.starts_with("/api/4/documentation/") { + if req.method() == Method::GET { + static_http!(path, "", "index.html"); + static_http!(path, "page.css"); + Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?) + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } } else { Ok(response(StatusCode::NOT_FOUND).body(Body::from(format!( "Sorry, not found: {:?} {:?} {:?}", diff --git a/httpret/static/documentation/index.html b/httpret/static/documentation/index.html new file mode 100644 index 0000000..4dd5d9c --- /dev/null +++ b/httpret/static/documentation/index.html @@ -0,0 +1,11 @@ + + + + Retrieval Documentation + + + +

Retrieval 4.0 Documentation

+

Some docs to be shown here...

+ + diff --git a/httpret/static/documentation/page.css b/httpret/static/documentation/page.css new file mode 100644 index 0000000..8885d68 --- /dev/null +++ b/httpret/static/documentation/page.css @@ -0,0 +1,5 @@ +body {} + +h1 { + color: cadetblue; +}