This commit is contained in:
Dominik Werder
2021-05-14 10:30:54 +02:00
parent 70426e8e48
commit 8dc80f5dba
12 changed files with 159 additions and 47 deletions

View File

@@ -62,15 +62,22 @@ impl AggregatableXdim1Bin for ValuesDim1 {
let ts = self.tss[i1];
let mut min = f32::MAX;
let mut max = f32::MIN;
let mut sum = 0f32;
let mut sum = f32::NAN;
let mut count = 0;
let vals = &self.values[i1];
assert!(vals.len() > 0);
for i2 in 0..vals.len() {
let v = vals[i2];
//info!("value {} {} {}", i1, i2, v);
min = min.min(v);
max = max.max(v);
sum += v;
if v.is_nan() {
} else {
if sum.is_nan() {
sum = v;
} else {
sum += v;
}
count += 1;
}
}
if min == f32::MAX {
min = f32::NAN;
@@ -81,7 +88,11 @@ impl AggregatableXdim1Bin for ValuesDim1 {
ret.tss.push(ts);
ret.mins.push(min);
ret.maxs.push(max);
ret.avgs.push(sum / vals.len() as f32);
if sum.is_nan() {
ret.avgs.push(sum);
} else {
ret.avgs.push(sum / count as f32);
}
}
ret
}

View File

@@ -96,6 +96,8 @@ where
fn cur(&mut self, cx: &mut Context) -> Poll<Option<Result<I, Error>>> {
if let Some(cur) = self.left.take() {
cur
} else if self.inp_completed {
Poll::Ready(None)
} else {
let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll");
inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx))
@@ -182,7 +184,6 @@ where
Some(Ready(Some(Err(e))))
}
Ready(None) => {
// No more input, no more in leftover.
self.inp_completed = true;
if self.all_bins_emitted {
None
@@ -239,9 +240,6 @@ where
continue 'outer;
}
} else {
// TODO make sure that we don't poll our input here after it has completed.
err::todo();
let cur = self.cur(cx);
match self.handle(cur) {
Some(item) => item,

View File

@@ -178,6 +178,7 @@ pub struct MinMaxAvgScalarEventBatchAggregator {
min: f32,
max: f32,
sum: f32,
sumc: u64,
}
impl MinMaxAvgScalarEventBatchAggregator {
@@ -185,10 +186,11 @@ impl MinMaxAvgScalarEventBatchAggregator {
Self {
ts1,
ts2,
count: 0,
min: f32::MAX,
max: f32::MIN,
sum: 0f32,
count: 0,
sum: f32::NAN,
sumc: 0,
}
}
}
@@ -226,10 +228,19 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
} else if ts >= self.ts2 {
continue;
} else {
self.count += 1;
self.min = self.min.min(v.mins[i1]);
self.max = self.max.max(v.maxs[i1]);
self.sum += v.avgs[i1];
self.count += 1;
let x = v.avgs[i1];
if x.is_nan() {
} else {
if self.sum.is_nan() {
self.sum = x;
} else {
self.sum += x;
}
self.sumc += 1;
}
}
}
}
@@ -237,10 +248,10 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
fn result(self) -> Vec<Self::OutputValue> {
let min = if self.min == f32::MAX { f32::NAN } else { self.min };
let max = if self.max == f32::MIN { f32::NAN } else { self.max };
let avg = if self.count == 0 {
let avg = if self.sumc == 0 {
f32::NAN
} else {
self.sum / self.count as f32
self.sum / self.sumc as f32
};
let v = MinMaxAvgScalarBinBatch {
ts1s: vec![self.ts1],

View File

@@ -116,11 +116,13 @@ impl std::fmt::Debug for MinMaxAvgScalarBinBatch {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
fmt,
"MinMaxAvgScalarBinBatch count {} ts1s {:?} ts2s {:?} counts {:?} avgs {:?}",
"MinMaxAvgScalarBinBatch count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}",
self.ts1s.len(),
self.ts1s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.ts2s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.counts,
self.mins,
self.maxs,
self.avgs,
)
}
@@ -289,8 +291,16 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
self.count += v.counts[i1];
self.min = self.min.min(v.mins[i1]);
self.max = self.max.max(v.maxs[i1]);
self.sum += v.avgs[i1];
self.sumc += 1;
let x = v.avgs[i1];
if x.is_nan() {
} else {
if self.sum.is_nan() {
self.sum = x;
} else {
self.sum += x;
}
self.sumc += 1;
}
}
}
}

View File

@@ -5,14 +5,20 @@ use futures_util::StreamExt;
use netpod::log::*;
use netpod::timeunits::MS;
use netpod::{ChannelConfig, NanoRange, Nanos, Node};
use std::path::PathBuf;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncSeekExt, ErrorKind, SeekFrom};
pub struct OpenedFile {
pub path: PathBuf,
pub file: File,
}
pub fn open_files(
range: &NanoRange,
channel_config: &ChannelConfig,
node: Node,
) -> async_channel::Receiver<Result<File, Error>> {
) -> async_channel::Receiver<Result<OpenedFile, Error>> {
let (chtx, chrx) = async_channel::bounded(2);
let range = range.clone();
let channel_config = channel_config.clone();
@@ -31,7 +37,7 @@ pub fn open_files(
}
async fn open_files_inner(
chtx: &async_channel::Sender<Result<File, Error>>,
chtx: &async_channel::Sender<Result<OpenedFile, Error>>,
range: &NanoRange,
channel_config: &ChannelConfig,
node: Node,
@@ -121,7 +127,8 @@ async fn open_files_inner(
},
}
}
chtx.send(Ok(file)).await?;
let ret = OpenedFile { file, path };
chtx.send(Ok(ret)).await?;
}
// TODO keep track of number of running
debug!("open_files_inner done");

View File

@@ -1,4 +1,4 @@
use crate::dataopen::open_files;
use crate::dataopen::{open_files, OpenedFile};
use crate::eventchunker::{EventChunker, EventChunkerConf, EventChunkerItem};
use crate::file_content_stream;
use err::Error;
@@ -7,11 +7,10 @@ use futures_util::StreamExt;
use netpod::{ChannelConfig, NanoRange, Node};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::fs::File;
pub struct EventBlobsComplete {
channel_config: ChannelConfig,
file_chan: async_channel::Receiver<Result<File, Error>>,
file_chan: async_channel::Receiver<Result<OpenedFile, Error>>,
evs: Option<EventChunker>,
buffer_size: usize,
event_chunker_conf: EventChunkerConf,
@@ -65,12 +64,14 @@ impl Stream for EventBlobsComplete {
None => match self.file_chan.poll_next_unpin(cx) {
Ready(Some(k)) => match k {
Ok(file) => {
let inp = Box::pin(file_content_stream(file, self.buffer_size as usize));
let path = file.path;
let inp = Box::pin(file_content_stream(file.file, self.buffer_size as usize));
let chunker = EventChunker::from_event_boundary(
inp,
self.channel_config.clone(),
self.range.clone(),
self.event_chunker_conf.clone(),
path,
);
self.evs = Some(chunker);
continue 'outer;

View File

@@ -7,6 +7,7 @@ use futures_util::StreamExt;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{ByteSize, ChannelConfig, EventDataReadStats, NanoRange, ScalarType, Shape};
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -24,6 +25,7 @@ pub struct EventChunker {
data_emit_complete: bool,
final_stats_sent: bool,
parsed_bytes: u64,
path: PathBuf,
}
enum DataFileState {
@@ -53,6 +55,7 @@ impl EventChunker {
channel_config: ChannelConfig,
range: NanoRange,
stats_conf: EventChunkerConf,
path: PathBuf,
) -> Self {
let mut inp = NeedMinBuffer::new(inp);
inp.set_need_min(6);
@@ -70,6 +73,7 @@ impl EventChunker {
data_emit_complete: false,
final_stats_sent: false,
parsed_bytes: 0,
path,
}
}
@@ -78,8 +82,9 @@ impl EventChunker {
channel_config: ChannelConfig,
range: NanoRange,
stats_conf: EventChunkerConf,
path: PathBuf,
) -> Self {
let mut ret = Self::from_start(inp, channel_config, range, stats_conf);
let mut ret = Self::from_start(inp, channel_config, range, stats_conf, path);
ret.state = DataFileState::Event;
ret.need_min = 4;
ret.inp.set_need_min(4);
@@ -100,23 +105,29 @@ impl EventChunker {
}
match self.state {
DataFileState::FileHeader => {
assert!(buf.len() >= 6, "logic");
if buf.len() < 6 {
Err(Error::with_msg("need min 6 for FileHeader"))?;
}
let mut sl = std::io::Cursor::new(buf.as_ref());
let fver = sl.read_i16::<BE>().unwrap();
assert!(fver == 0, "unexpected file version");
if fver != 0 {
Err(Error::with_msg("unexpected data file version"))?;
}
let len = sl.read_i32::<BE>().unwrap();
assert!(len > 0 && len < 128, "unexpected data file header");
if len <= 0 || len >= 128 {
Err(Error::with_msg("large channel header len"))?;
}
let totlen = len as usize + 2;
if buf.len() < totlen {
debug!("parse_buf not enough A totlen {}", totlen);
self.need_min = totlen as u32;
break;
} else {
sl.advance(len as usize - 8);
let len2 = sl.read_i32::<BE>().unwrap();
assert!(len == len2, "len mismatch");
let s1 = String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec()).unwrap();
info!("channel name {}", s1);
if len != len2 {
Err(Error::with_msg("channel header len mismatch"))?;
}
String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec())?;
self.state = DataFileState::Event;
self.need_min = 4;
buf.advance(totlen);
@@ -126,7 +137,9 @@ impl EventChunker {
DataFileState::Event => {
let mut sl = std::io::Cursor::new(buf.as_ref());
let len = sl.read_i32::<BE>().unwrap();
assert!(len >= 20 && len < 1024 * 1024 * 10);
if len < 20 || len > 1024 * 1024 * 10 {
Err(Error::with_msg("unexpected large event chunk"))?;
}
let len = len as u32;
if (buf.len() as u32) < len {
self.need_min = len as u32;
@@ -144,18 +157,37 @@ impl EventChunker {
break;
}
if ts < self.range.beg {
error!("seen before range: {}", ts / SEC);
Err(Error::with_msg(format!(
"seen before range: event ts: {}.{} range beg: {}.{} range end: {}.{} pulse {} config {:?} path {:?}",
ts / SEC,
ts % SEC,
self.range.beg / SEC,
self.range.beg % SEC,
self.range.end / SEC,
self.range.end % SEC,
pulse,
self.channel_config.shape,
self.path
)))?;
}
let _ioc_ts = sl.read_i64::<BE>().unwrap();
let status = sl.read_i8().unwrap();
let severity = sl.read_i8().unwrap();
let optional = sl.read_i32::<BE>().unwrap();
assert!(status == 0);
assert!(severity == 0);
assert!(optional == -1);
if status != 0 {
Err(Error::with_msg(format!("status != 0: {}", status)))?;
}
if severity != 0 {
Err(Error::with_msg(format!("severity != 0: {}", severity)))?;
}
if optional != -1 {
Err(Error::with_msg(format!("optional != -1: {}", optional)))?;
}
let type_flags = sl.read_u8().unwrap();
let type_index = sl.read_u8().unwrap();
assert!(type_index <= 13);
if type_index > 13 {
Err(Error::with_msg(format!("type_index: {}", type_index)))?;
}
let scalar_type = ScalarType::from_dtype_index(type_index)?;
use super::dtflags::*;
let is_compressed = type_flags & COMPRESSION != 0;
@@ -163,7 +195,9 @@ impl EventChunker {
let is_big_endian = type_flags & BIG_ENDIAN != 0;
let is_shaped = type_flags & SHAPE != 0;
if let Shape::Wave(_) = self.channel_config.shape {
assert!(is_array);
if !is_array {
Err(Error::with_msg(format!("dim1 but not array {:?}", self.channel_config)))?;
}
}
let compression_method = if is_compressed { sl.read_u8().unwrap() } else { 0 };
let shape_dim = if is_shaped { sl.read_u8().unwrap() } else { 0 };

View File

@@ -67,7 +67,7 @@ where
trace!("prepare read from wp {} self.buf.len() {}", self.wp, self.buf.len(),);
let gg = self.buf.len() - self.wp;
let mut buf2 = ReadBuf::new(&mut self.buf[self.wp..]);
if gg < 1 || gg > 1024 * 1024 * 20 {
if gg < 1 || gg > 1024 * 1024 * 40 {
use bytes::Buf;
panic!(
"have gg {} len {} cap {} rem {} rem mut {} self.wp {}",
@@ -126,9 +126,7 @@ where
}
(Some(None), buf, wp)
} else {
if len > 1024 * 32 {
warn!("InMemoryFrameAsyncReadStream big len received {}", len);
} else if len > 1024 * 1024 * 2 {
if len > 1024 * 1024 * 50 {
error!("InMemoryFrameAsyncReadStream too long len {}", len);
return (
Some(Some(Err(Error::with_msg(format!(
@@ -138,6 +136,9 @@ where
buf,
wp,
);
} else if len > 1024 * 1024 * 1 {
// TODO
//warn!("InMemoryFrameAsyncReadStream big len received {}", len);
}
let nl = len as usize + INMEM_FRAME_HEAD + INMEM_FRAME_FOOT;
if self.bufcap < nl {

View File

@@ -284,7 +284,7 @@ fn unused_raw_concat_channel_read_stream_file_pipe(
let chrx = open_files(&range, &channel_config, node);
while let Ok(file) = chrx.recv().await {
let mut file = match file {
Ok(k) => k,
Ok(k) => k.file,
Err(_) => break
};
loop {
@@ -346,9 +346,9 @@ pub fn parsed1(
while let Ok(fileres) = filerx.recv().await {
match fileres {
Ok(file) => {
let inp = Box::pin(file_content_stream(file, query.buffer_size as usize));
let inp = Box::pin(file_content_stream(file.file, query.buffer_size as usize));
let range = err::todoval();
let mut chunker = eventchunker::EventChunker::from_event_boundary(inp, err::todoval(), range, stats_conf.clone());
let mut chunker = eventchunker::EventChunker::from_event_boundary(inp, err::todoval(), range, stats_conf.clone(), file.path);
while let Some(evres) = chunker.next().await {
use eventchunker::EventChunkerItem;
match evres {

View File

@@ -83,6 +83,21 @@ where
impl<F> UnwindSafe for Cont<F> {}
macro_rules! static_http {
($path:expr, $tgt:expr, $tgtex:expr) => {
if $path == concat!("/api/4/documentation/", $tgt) {
let c = include_bytes!(concat!("../static/documentation/", $tgt, $tgtex));
return Ok(response(StatusCode::OK).body(Body::from(&c[..]))?);
}
};
($path:expr, $tgt:expr) => {
if $path == concat!("/api/4/documentation/", $tgt) {
let c = include_bytes!(concat!("../static/documentation/", $tgt));
return Ok(response(StatusCode::OK).body(Body::from(&c[..]))?);
}
};
}
async fn data_api_proxy_try(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
let uri = req.uri().clone();
let path = uri.path();
@@ -122,6 +137,14 @@ async fn data_api_proxy_try(req: Request<Body>, node_config: &NodeConfigCached)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path.starts_with("/api/4/documentation/") {
if req.method() == Method::GET {
static_http!(path, "", "index.html");
static_http!(path, "page.css");
Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else {
Ok(response(StatusCode::NOT_FOUND).body(Body::from(format!(
"Sorry, not found: {:?} {:?} {:?}",

View File

@@ -0,0 +1,11 @@
<!doctype html>
<html>
<head>
<title>Retrieval Documentation</title>
<link rel="stylesheet" href="page.css"/>
</head>
<body>
<h1>Retrieval 4.0 Documentation</h1>
<p>Some docs to be shown here...</p>
</body>
</html>

View File

@@ -0,0 +1,5 @@
body {}
h1 {
color: cadetblue;
}