Add String place-holder event type, public facing error message

This commit is contained in:
Dominik Werder
2022-02-15 22:03:56 +01:00
parent a9f9d1ada6
commit 502b8fb6ae
23 changed files with 278 additions and 115 deletions

View File

@@ -9,7 +9,7 @@ use bytes::Bytes;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::numops::{BoolNum, NumOps};
use items::numops::{BoolNum, NumOps, StringNum};
use items::{
Appendable, Clearable, EventsNodeProcessor, Framable, FrameType, PushableIndex, Sitemty, TimeBinnableType,
};
@@ -161,6 +161,7 @@ fn make_num_pipeline(
ScalarType::F32 => match_end!(f32, byte_order, shape, agg_kind, query, node_config),
ScalarType::F64 => match_end!(f64, byte_order, shape, agg_kind, query, node_config),
ScalarType::BOOL => match_end!(BoolNum, byte_order, shape, agg_kind, query, node_config),
ScalarType::STRING => match_end!(StringNum, byte_order, shape, agg_kind, query, node_config),
}
}

View File

@@ -9,7 +9,7 @@ use err::Error;
use futures_core::Stream;
use futures_util::future::FutureExt;
use futures_util::StreamExt;
use items::numops::{BoolNum, NumOps};
use items::numops::{BoolNum, NumOps, StringNum};
use items::scalarevents::ScalarEvents;
use items::streams::{Collectable, Collector};
use items::{
@@ -189,6 +189,7 @@ where
ScalarType::F32 => match_end!(f, f32, byte_order, scalar_type, shape, agg_kind, node_config),
ScalarType::F64 => match_end!(f, f64, byte_order, scalar_type, shape, agg_kind, node_config),
ScalarType::BOOL => match_end!(f, BoolNum, byte_order, scalar_type, shape, agg_kind, node_config),
ScalarType::STRING => match_end!(f, StringNum, byte_order, scalar_type, shape, agg_kind, node_config),
}
}

View File

@@ -5,7 +5,7 @@ use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::eventsitem::EventsItem;
use items::numops::{BoolNum, NumOps};
use items::numops::{BoolNum, NumOps, StringNum};
use items::plainevents::{PlainEvents, ScalarPlainEvents};
use items::scalarevents::ScalarEvents;
use items::waveevents::{WaveEvents, WaveNBinner, WavePlainProc, WaveXBinner};
@@ -48,6 +48,20 @@ impl NumFromBytes<BoolNum, BigEndian> for BoolNum {
}
}
impl NumFromBytes<StringNum, LittleEndian> for StringNum {
fn convert(_buf: &[u8], _big_endian: bool) -> StringNum {
netpod::log::error!("TODO NumFromBytes for StringNum");
todo!()
}
}
impl NumFromBytes<StringNum, BigEndian> for StringNum {
fn convert(_buf: &[u8], _big_endian: bool) -> StringNum {
netpod::log::error!("TODO NumFromBytes for StringNum");
todo!()
}
}
macro_rules! impl_num_from_bytes_end {
($nty:ident, $nl:expr, $end:ident, $ec:ident) => {
impl NumFromBytes<$nty, $end> for $nty {
@@ -402,6 +416,11 @@ impl EventsItemStream {
let cont = ScalarEvents::<i8>::empty();
ret = Some(EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::I8(cont))));
}
ScalarType::STRING => {
// TODO
let cont = ScalarEvents::<String>::empty();
ret = Some(EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::String(cont))));
}
},
Shape::Wave(_) => todo!(),
Shape::Image(..) => todo!(),
@@ -444,6 +463,7 @@ impl EventsItemStream {
}
}
ScalarType::BOOL => todo!(),
ScalarType::STRING => todo!(),
},
Shape::Wave(_) => todo!(),
Shape::Image(_, _) => todo!(),

View File

@@ -126,8 +126,8 @@ impl PlainEventsJsonQuery {
pub fn from_url(url: &Url) -> Result<Self, Error> {
let pairs = get_url_query_pairs(url);
let beg_date = pairs.get("begDate").ok_or(Error::with_msg("missing begDate"))?;
let end_date = pairs.get("endDate").ok_or(Error::with_msg("missing endDate"))?;
let beg_date = pairs.get("begDate").ok_or(Error::with_public_msg("missing begDate"))?;
let end_date = pairs.get("endDate").ok_or(Error::with_public_msg("missing endDate"))?;
let ret = Self {
range: NanoRange {
beg: beg_date.parse::<DateTime<Utc>>()?.to_nanos(),
@@ -138,23 +138,23 @@ impl PlainEventsJsonQuery {
.get("diskIoBufferSize")
.map_or("4096", |k| k)
.parse()
.map_err(|e| Error::with_msg(format!("can not parse diskIoBufferSize {:?}", e)))?,
.map_err(|e| Error::with_public_msg(format!("can not parse diskIoBufferSize {:?}", e)))?,
report_error: pairs
.get("reportError")
.map_or("false", |k| k)
.parse()
.map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?,
.map_err(|e| Error::with_public_msg(format!("can not parse reportError {:?}", e)))?,
timeout: pairs
.get("timeout")
.map_or("10000", |k| k)
.parse::<u64>()
.map(|k| Duration::from_millis(k))
.map_err(|e| Error::with_msg(format!("can not parse timeout {:?}", e)))?,
.map_err(|e| Error::with_public_msg(format!("can not parse timeout {:?}", e)))?,
do_log: pairs
.get("doLog")
.map_or("false", |k| k)
.parse()
.map_err(|e| Error::with_msg(format!("can not parse doLog {:?}", e)))?,
.map_err(|e| Error::with_public_msg(format!("can not parse doLog {:?}", e)))?,
};
Ok(ret)
}

View File

@@ -7,7 +7,7 @@ use crate::eventchunker::{EventChunkerConf, EventFull};
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::numops::{BoolNum, NumOps};
use items::numops::{BoolNum, NumOps, StringNum};
use items::{EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, StreamItem};
use netpod::query::RawEventsQuery;
use netpod::{AggKind, ByteOrder, ByteSize, Channel, FileIoBufferSize, NanoRange, NodeConfigCached, ScalarType, Shape};
@@ -133,6 +133,7 @@ macro_rules! pipe1 {
ScalarType::F32 => pipe2!(f32, $end, $shape, $agg_kind, $event_blobs),
ScalarType::F64 => pipe2!(f64, $end, $shape, $agg_kind, $event_blobs),
ScalarType::BOOL => pipe2!(BoolNum, $end, $shape, $agg_kind, $event_blobs),
ScalarType::STRING => pipe2!(StringNum, $end, $shape, $agg_kind, $event_blobs),
}
};
}
@@ -164,8 +165,8 @@ pub async fn make_event_pipe(
Err(e) => return Err(e)?,
};
let entry = match entry_res {
MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found"))?,
MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found"))?,
MatchingConfigEntry::None => return Err(Error::with_public_msg("no config entry found"))?,
MatchingConfigEntry::Multiple => return Err(Error::with_public_msg("multiple config entries found"))?,
MatchingConfigEntry::Entry(entry) => entry,
};
let shape = match entry.to_shape() {
@@ -215,8 +216,8 @@ pub async fn get_applicable_entry(
Err(e) => return Err(e)?,
};
let entry = match entry_res {
MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found"))?,
MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found"))?,
MatchingConfigEntry::None => return Err(Error::with_public_msg("no config entry found"))?,
MatchingConfigEntry::Multiple => return Err(Error::with_public_msg("multiple config entries found"))?,
MatchingConfigEntry::Entry(entry) => entry,
};
Ok(entry.clone())

View File

@@ -51,6 +51,20 @@ impl Error {
}
}
pub fn with_public_msg<S: Into<String>>(s: S) -> Self {
let s = s.into();
let ret = Self::with_msg(&s);
let ret = ret.add_public_msg(s);
ret
}
pub fn with_public_msg_no_trace<S: Into<String>>(s: S) -> Self {
let s = s.into();
let ret = Self::with_msg_no_trace(&s);
let ret = ret.add_public_msg(s);
ret
}
pub fn from_string<E>(e: E) -> Self
where
E: ToString,
@@ -94,7 +108,7 @@ fn fmt_backtrace(trace: &backtrace::Backtrace) -> String {
None => false,
Some(s) => {
let s = s.to_str().unwrap();
s.contains("/dev/daqbuffer/") || s.contains("/data_meta/build/")
true || s.contains("/dev/daqbuffer/") || s.contains("/data_meta/build/")
}
};
let name = match sy.name() {
@@ -113,7 +127,7 @@ fn fmt_backtrace(trace: &backtrace::Backtrace) -> String {
_ => 0,
};
if is_ours {
write!(&mut buf, "\n {}\n {} {}", name, filename, lineno).unwrap();
write!(&mut buf, "\n {name}\n {filename} {lineno}").unwrap();
c1 += 1;
if c1 >= 10 {
break 'outer;

View File

@@ -678,9 +678,11 @@ impl Stream for DataApiPython3DataStream {
Err(e) => return Err(e)?,
};
let entry = match entry_res {
MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found"))?,
MatchingConfigEntry::None => {
return Err(Error::with_public_msg("no config entry found"))?
}
MatchingConfigEntry::Multiple => {
return Err(Error::with_msg("multiple config entries found"))?
return Err(Error::with_public_msg("multiple config entries found"))?
}
MatchingConfigEntry::Entry(entry) => entry.clone(),
};

View File

@@ -12,6 +12,14 @@ impl Error {
Self(::err::Error::with_msg_no_trace(s))
}
pub fn with_public_msg<S: Into<String>>(s: S) -> Self {
Self(::err::Error::with_public_msg(s))
}
pub fn with_public_msg_no_trace<S: Into<String>>(s: S) -> Self {
Self(::err::Error::with_public_msg_no_trace(s))
}
pub fn msg(&self) -> &str {
self.0.msg()
}
@@ -27,7 +35,7 @@ impl Error {
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(self, fmt)
fmt::Display::fmt(&self.0, fmt)
}
}

View File

@@ -21,10 +21,10 @@ use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn};
use hyper::{server::Server, Body, Request, Response};
use net::SocketAddr;
use netpod::log::*;
use netpod::query::BinnedQuery;
use netpod::timeunits::SEC;
use netpod::{channel_from_pairs, get_url_query_pairs, AggKind, ChannelConfigQuery, FromUrl, NodeConfigCached};
use netpod::{log::*, ACCEPT_ALL};
use netpod::{APP_JSON, APP_JSON_LINES, APP_OCTET};
use nodenet::conn::events_service;
use panic::{AssertUnwindSafe, UnwindSafe};
@@ -81,7 +81,7 @@ async fn http_service(req: Request<Body>, node_config: NodeConfigCached) -> Resu
match http_service_try(req, &node_config).await {
Ok(k) => Ok(k),
Err(e) => {
error!("daqbuffer node http_service sees error: {:?}", e);
error!("daqbuffer node http_service sees error: {}", e);
Err(e)
}
}
@@ -448,30 +448,34 @@ trait ToPublicResponse {
impl ToPublicResponse for Error {
fn to_public_response(&self) -> Response<Body> {
use std::fmt::Write;
let status = match self.reason() {
Some(::err::Reason::BadRequest) => StatusCode::BAD_REQUEST,
Some(::err::Reason::InternalError) => StatusCode::INTERNAL_SERVER_ERROR,
_ => StatusCode::INTERNAL_SERVER_ERROR,
};
let msg = match self.public_msg() {
let mut msg = match self.public_msg() {
Some(v) => v.join("\n"),
_ => String::new(),
};
write!(msg, "\n\nhttps://data-api.psi.ch/api/4/documentation\n").unwrap();
response(status).body(Body::from(msg)).unwrap()
}
}
impl ToPublicResponse for ::err::Error {
fn to_public_response(&self) -> Response<Body> {
use std::fmt::Write;
let status = match self.reason() {
Some(::err::Reason::BadRequest) => StatusCode::BAD_REQUEST,
Some(::err::Reason::InternalError) => StatusCode::INTERNAL_SERVER_ERROR,
_ => StatusCode::INTERNAL_SERVER_ERROR,
};
let msg = match self.public_msg() {
let mut msg = match self.public_msg() {
Some(v) => v.join("\n"),
_ => String::new(),
};
write!(msg, "\n\nhttps://data-api.psi.ch/api/4/documentation\n").unwrap();
response(status).body(Body::from(msg)).unwrap()
}
}
@@ -494,7 +498,7 @@ async fn binned_inner(req: Request<Body>, node_config: &NodeConfigCached) -> Res
});
match head.headers.get(http::header::ACCEPT) {
Some(v) if v == APP_OCTET => binned_binary(query, node_config).await,
Some(v) if v == APP_JSON => binned_json(query, node_config).await,
Some(v) if v == APP_JSON || v == ACCEPT_ALL => binned_json(query, node_config).await,
_ => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?),
}
}
@@ -573,13 +577,13 @@ async fn plain_events(req: Request<Body>, node_config: &NodeConfigCached) -> Res
}
async fn plain_events_inner(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
debug!("httpret plain_events_inner headers: {:?}", req.headers());
info!("httpret plain_events_inner req: {:?}", req);
let accept_def = APP_JSON;
let accept = req
.headers()
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept == APP_JSON {
if accept == APP_JSON || accept == ACCEPT_ALL {
Ok(plain_events_json(req, node_config).await?)
} else if accept == APP_OCTET {
Ok(plain_events_binary(req, node_config).await?)

View File

@@ -18,6 +18,7 @@ pub enum SingleBinWaveEvents {
I64(XBinnedScalarEvents<i64>),
F32(XBinnedScalarEvents<f32>),
F64(XBinnedScalarEvents<f64>),
String(XBinnedScalarEvents<String>),
}
impl SingleBinWaveEvents {
@@ -108,6 +109,7 @@ pub enum MultiBinWaveEvents {
I64(XBinnedWaveEvents<i64>),
F32(XBinnedWaveEvents<f32>),
F64(XBinnedWaveEvents<f64>),
String(XBinnedWaveEvents<String>),
}
impl MultiBinWaveEvents {

View File

@@ -22,6 +22,7 @@ use err::Error;
use netpod::timeunits::{MS, SEC};
use netpod::{log::Level, AggKind, EventDataReadStats, EventQueryJsonStringFrame, NanoRange, Shape};
use netpod::{DiskStats, RangeFilterStats};
use numops::StringNum;
use serde::de::{self, DeserializeOwned, Visitor};
use serde::{Deserialize, Serialize, Serializer};
use std::fmt;
@@ -199,10 +200,14 @@ impl SubFrId for f64 {
const SUB: u32 = 12;
}
impl SubFrId for BoolNum {
impl SubFrId for StringNum {
const SUB: u32 = 13;
}
impl SubFrId for BoolNum {
const SUB: u32 = 14;
}
pub trait SitemtyFrameType {
const FRAME_TYPE_ID: u32;
}

View File

@@ -364,28 +364,28 @@ where
if item.ts2s[i1] <= self.range.beg {
} else if item.ts1s[i1] >= self.range.end {
} else {
self.min = match self.min {
None => item.mins[i1],
Some(min) => match item.mins[i1] {
None => Some(min),
self.min = match &self.min {
None => item.mins[i1].clone(),
Some(min) => match &item.mins[i1] {
None => Some(min.clone()),
Some(v) => {
if v < min {
Some(v)
if v < &min {
Some(v.clone())
} else {
Some(min)
Some(min.clone())
}
}
},
};
self.max = match self.max {
None => item.maxs[i1],
Some(max) => match item.maxs[i1] {
None => Some(max),
self.max = match &self.max {
None => item.maxs[i1].clone(),
Some(max) => match &item.maxs[i1] {
None => Some(max.clone()),
Some(v) => {
if v > max {
Some(v)
if v > &max {
Some(v.clone())
} else {
Some(max)
Some(max.clone())
}
}
},
@@ -415,8 +415,8 @@ where
ts1s: vec![self.range.beg],
ts2s: vec![self.range.end],
counts: vec![self.count],
mins: vec![self.min],
maxs: vec![self.max],
mins: vec![self.min.clone()],
maxs: vec![self.max.clone()],
avgs: vec![avg],
};
self.count = 0;

View File

@@ -369,8 +369,8 @@ where
None => {}
Some(v) => {
for (a, b) in min.iter_mut().zip(v.iter()) {
if *b < *a {
*a = *b;
if b < a {
*a = b.clone();
}
}
}
@@ -382,8 +382,8 @@ where
None => {}
Some(v) => {
for (a, b) in max.iter_mut().zip(v.iter()) {
if *b > *a {
*a = *b;
if b > a {
*a = b.clone();
}
}
}

View File

@@ -376,7 +376,7 @@ where
Some(inp) => {
for (a, b) in self.min.iter_mut().zip(inp.iter()) {
if *b < *a || a.is_nan() {
*a = *b;
*a = b.clone();
}
}
}
@@ -386,7 +386,7 @@ where
Some(inp) => {
for (a, b) in self.max.iter_mut().zip(inp.iter()) {
if *b > *a || a.is_nan() {
*a = *b;
*a = b.clone();
}
}
}

View File

@@ -1,5 +1,5 @@
use crate::SubFrId;
use num_traits::{AsPrimitive, Bounded, Float, Zero};
use num_traits::{Bounded, Float, Zero};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
@@ -60,14 +60,64 @@ impl PartialOrd for BoolNum {
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StringNum(pub String);
impl StringNum {
pub const MIN: Self = Self(String::new());
pub const MAX: Self = Self(String::new());
}
impl Add<StringNum> for StringNum {
type Output = StringNum;
fn add(self, _rhs: StringNum) -> Self::Output {
todo!()
}
}
impl num_traits::Zero for StringNum {
fn zero() -> Self {
Self(String::new())
}
fn is_zero(&self) -> bool {
self.0.is_empty()
}
}
impl num_traits::Bounded for StringNum {
fn min_value() -> Self {
Self(String::new())
}
fn max_value() -> Self {
Self(String::new())
}
}
impl PartialEq for StringNum {
fn eq(&self, other: &Self) -> bool {
PartialEq::eq(&self.0, &other.0)
}
}
impl PartialOrd for StringNum {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
PartialOrd::partial_cmp(&self.0, &other.0)
}
}
pub trait NumOps:
Sized
+ Copy
//+ Copy
+ Clone
+ AsPrimF32
+ Send
+ Unpin
+ Debug
+ Zero
+ AsPrimitive<f32>
//+ AsPrimitive<f32>
+ Bounded
+ PartialOrd
+ SubFrId
@@ -103,6 +153,44 @@ fn is_nan_float<T: Float>(x: &T) -> bool {
x.is_nan()
}
pub trait AsPrimF32 {
fn as_prim_f32(&self) -> f32;
}
macro_rules! impl_as_prim_f32 {
($ty:ident) => {
impl AsPrimF32 for $ty {
fn as_prim_f32(&self) -> f32 {
*self as f32
}
}
};
}
impl_as_prim_f32!(u8);
impl_as_prim_f32!(u16);
impl_as_prim_f32!(u32);
impl_as_prim_f32!(u64);
impl_as_prim_f32!(i8);
impl_as_prim_f32!(i16);
impl_as_prim_f32!(i32);
impl_as_prim_f32!(i64);
impl_as_prim_f32!(f32);
impl_as_prim_f32!(f64);
impl AsPrimF32 for BoolNum {
fn as_prim_f32(&self) -> f32 {
self.0 as f32
}
}
impl AsPrimF32 for StringNum {
fn as_prim_f32(&self) -> f32 {
netpod::log::error!("TODO impl AsPrimF32 for StringNum");
todo!()
}
}
impl_num_ops!(u8, MIN, MAX, is_nan_int);
impl_num_ops!(u16, MIN, MAX, is_nan_int);
impl_num_ops!(u32, MIN, MAX, is_nan_int);
@@ -114,3 +202,4 @@ impl_num_ops!(i64, MIN, MAX, is_nan_int);
impl_num_ops!(f32, NAN, NAN, is_nan_float);
impl_num_ops!(f64, NAN, NAN, is_nan_float);
impl_num_ops!(BoolNum, MIN, MAX, is_nan_int);
impl_num_ops!(StringNum, MIN, MAX, is_nan_int);

View File

@@ -22,6 +22,7 @@ pub enum ScalarPlainEvents {
I64(ScalarEvents<i64>),
F32(ScalarEvents<f32>),
F64(ScalarEvents<f64>),
String(ScalarEvents<String>),
}
impl ScalarPlainEvents {
@@ -100,6 +101,7 @@ pub enum WavePlainEvents {
I64(WaveEvents<i64>),
F32(WaveEvents<f32>),
F64(WaveEvents<f64>),
String(WaveEvents<String>),
}
impl WavePlainEvents {

View File

@@ -13,6 +13,8 @@ use serde::{Deserialize, Serialize};
use std::fmt;
use tokio::fs::File;
// TODO in this module reduce clones.
// TODO add pulse.
#[derive(Serialize, Deserialize)]
pub struct ScalarEvents<NTY> {
@@ -147,7 +149,7 @@ where
{
fn push_index(&mut self, src: &Self, ix: usize) {
self.tss.push(src.tss[ix]);
self.values.push(src.values[ix]);
self.values.push(src.values[ix].clone());
}
}
@@ -316,32 +318,33 @@ where
}
}
// TODO reduce clone.. optimize via more traits to factor the trade-offs?
fn apply_min_max(&mut self, val: NTY) {
self.min = match self.min {
None => Some(val),
self.min = match &self.min {
None => Some(val.clone()),
Some(min) => {
if val < min {
Some(val)
if &val < min {
Some(val.clone())
} else {
Some(min)
Some(min.clone())
}
}
};
self.max = match self.max {
self.max = match &self.max {
None => Some(val),
Some(max) => {
if val > max {
if &val > max {
Some(val)
} else {
Some(max)
Some(max.clone())
}
}
};
}
fn apply_event_unweight(&mut self, val: NTY) {
let vf = val.as_prim_f32();
self.apply_min_max(val);
let vf = val.as_();
if vf.is_nan() {
} else {
self.sum += vf;
@@ -350,15 +353,15 @@ where
}
fn apply_event_time_weight(&mut self, ts: u64) {
if let Some(v) = self.last_val {
debug!("apply_event_time_weight");
self.apply_min_max(v);
if let Some(v) = &self.last_val {
let vf = v.as_prim_f32();
let v2 = v.clone();
self.apply_min_max(v2);
let w = if self.do_time_weight {
(ts - self.int_ts) as f32 * 1e-9
} else {
1.
};
let vf = v.as_();
if vf.is_nan() {
} else {
self.sum += vf * w;
@@ -376,7 +379,7 @@ where
fn ingest_unweight(&mut self, item: &<Self as TimeBinnableTypeAggregator>::Input) {
for i1 in 0..item.tss.len() {
let ts = item.tss[i1];
let val = item.values[i1];
let val = item.values[i1].clone();
if ts < self.range.beg {
} else if ts >= self.range.end {
} else {
@@ -389,7 +392,7 @@ where
fn ingest_time_weight(&mut self, item: &<Self as TimeBinnableTypeAggregator>::Input) {
for i1 in 0..item.tss.len() {
let ts = item.tss[i1];
let val = item.values[i1];
let val = item.values[i1].clone();
if ts < self.int_ts {
debug!("just set int_ts");
self.last_ts = ts;
@@ -417,8 +420,8 @@ where
ts1s: vec![self.range.beg],
ts2s: vec![self.range.end],
counts: vec![self.count],
mins: vec![self.min],
maxs: vec![self.max],
mins: vec![self.min.clone()],
maxs: vec![self.max.clone()],
avgs: vec![avg],
};
self.int_ts = range.beg;
@@ -447,8 +450,8 @@ where
ts1s: vec![self.range.beg],
ts2s: vec![self.range.end],
counts: vec![self.count],
mins: vec![self.min],
maxs: vec![self.max],
mins: vec![self.min.clone()],
maxs: vec![self.max.clone()],
avgs: vec![avg],
};
self.int_ts = range.beg;

View File

@@ -246,7 +246,7 @@ where
Some(min) => {
for (a, b) in min.iter_mut().zip(item.vals[i1].iter()) {
if b < a {
*a = *b;
*a = b.clone();
}
}
}
@@ -256,18 +256,18 @@ where
Some(max) => {
for (a, b) in max.iter_mut().zip(item.vals[i1].iter()) {
if b < a {
*a = *b;
*a = b.clone();
}
}
}
};
match self.sum.as_mut() {
None => {
self.sum = Some(item.vals[i1].iter().map(|k| k.as_()).collect());
self.sum = Some(item.vals[i1].iter().map(|k| k.as_prim_f32()).collect());
}
Some(sum) => {
for (a, b) in sum.iter_mut().zip(item.vals[i1].iter()) {
let vf = b.as_();
let vf = b.as_prim_f32();
if vf.is_nan() {
} else {
*a += vf;
@@ -356,14 +356,14 @@ where
let mut sum = 0f32;
let mut sumc = 0;
let vals = &inp.vals[i1];
for &v in vals {
if v < min || min.is_nan() {
min = v;
for v in vals.iter() {
if v < &min || min.is_nan() {
min = v.clone();
}
if v > max || max.is_nan() {
max = v;
if v > &max || max.is_nan() {
max = v.clone();
}
let vf = v.as_();
let vf = v.as_prim_f32();
if vf.is_nan() {
} else {
sum += vf;
@@ -420,17 +420,17 @@ where
let mut max = vec![NTY::min_or_nan(); self.x_bin_count];
let mut sum = vec![0f32; self.x_bin_count];
let mut sumc = vec![0u64; self.x_bin_count];
for (i2, &v) in inp.vals[i1].iter().enumerate() {
for (i2, v) in inp.vals[i1].iter().enumerate() {
let i3 = i2 * self.x_bin_count / self.shape_bin_count;
if v < min[i3] || min[i3].is_nan() {
min[i3] = v;
if v < &min[i3] || min[i3].is_nan() {
min[i3] = v.clone();
}
if v > max[i3] || max[i3].is_nan() {
max[i3] = v;
if v > &max[i3] || max[i3].is_nan() {
max[i3] = v.clone();
}
if v.is_nan() {
} else {
sum[i3] += v.as_();
sum[i3] += v.as_prim_f32();
sumc[i3] += 1;
}
}

View File

@@ -12,6 +12,8 @@ use netpod::NanoRange;
use serde::{Deserialize, Serialize};
use tokio::fs::File;
// TODO in this module reduce clones
// TODO rename Scalar -> Dim0
#[derive(Debug, Serialize, Deserialize)]
pub struct XBinnedScalarEvents<NTY> {
@@ -124,8 +126,8 @@ where
{
fn push_index(&mut self, src: &Self, ix: usize) {
self.tss.push(src.tss[ix]);
self.mins.push(src.mins[ix]);
self.maxs.push(src.maxs[ix]);
self.mins.push(src.mins[ix].clone());
self.maxs.push(src.maxs[ix].clone());
self.avgs.push(src.avgs[ix]);
}
}
@@ -225,23 +227,23 @@ where
}
fn apply_min_max(&mut self, min: NTY, max: NTY) {
self.min = match self.min {
self.min = match &self.min {
None => Some(min),
Some(cmin) => {
if min < cmin {
if &min < cmin {
Some(min)
} else {
Some(cmin)
Some(cmin.clone())
}
}
};
self.max = match self.max {
self.max = match &self.max {
None => Some(max),
Some(cmax) => {
if max > cmax {
if &max > cmax {
Some(max)
} else {
Some(cmax)
Some(cmax.clone())
}
}
};
@@ -260,8 +262,10 @@ where
fn apply_event_time_weight(&mut self, ts: u64) {
//debug!("apply_event_time_weight");
if let (Some(avg), Some(min), Some(max)) = (self.last_avg, self.last_min, self.last_max) {
self.apply_min_max(min, max);
if let (Some(avg), Some(min), Some(max)) = (self.last_avg, &self.last_min, &self.last_max) {
let min2 = min.clone();
let max2 = max.clone();
self.apply_min_max(min2, max2);
let w = (ts - self.int_ts) as f32 / self.range.delta() as f32;
if avg.is_nan() {
} else {
@@ -276,8 +280,8 @@ where
for i1 in 0..item.tss.len() {
let ts = item.tss[i1];
let avg = item.avgs[i1];
let min = item.mins[i1];
let max = item.maxs[i1];
let min = item.mins[i1].clone();
let max = item.maxs[i1].clone();
if ts < self.range.beg {
} else if ts >= self.range.end {
} else {
@@ -291,8 +295,8 @@ where
for i1 in 0..item.tss.len() {
let ts = item.tss[i1];
let avg = item.avgs[i1];
let min = item.mins[i1];
let max = item.maxs[i1];
let min = item.mins[i1].clone();
let max = item.maxs[i1].clone();
if ts < self.int_ts {
self.last_ts = ts;
self.last_avg = Some(avg);
@@ -321,8 +325,8 @@ where
ts1s: vec![self.range.beg],
ts2s: vec![self.range.end],
counts: vec![self.count],
mins: vec![self.min],
maxs: vec![self.max],
mins: vec![self.min.clone()],
maxs: vec![self.max.clone()],
avgs: vec![avg],
};
self.int_ts = range.beg;
@@ -348,8 +352,8 @@ where
ts1s: vec![self.range.beg],
ts2s: vec![self.range.end],
counts: vec![self.count],
mins: vec![self.min],
maxs: vec![self.max],
mins: vec![self.min.clone()],
maxs: vec![self.max.clone()],
avgs: vec![avg],
};
self.int_ts = range.beg;

View File

@@ -228,6 +228,7 @@ where
}
}
// TODO get rid of clones.
fn apply_min_max(&mut self, min: &Vec<NTY>, max: &Vec<NTY>) {
self.min = match self.min.take() {
None => Some(min.clone()),
@@ -235,7 +236,7 @@ where
let a = cmin
.into_iter()
.zip(min)
.map(|(a, b)| if a < *b { a } else { *b })
.map(|(a, b)| if a < *b { a } else { b.clone() })
.collect();
Some(a)
}
@@ -246,7 +247,7 @@ where
let a = cmax
.into_iter()
.zip(min)
.map(|(a, b)| if a > *b { a } else { *b })
.map(|(a, b)| if a > *b { a } else { b.clone() })
.collect();
Some(a)
}

View File

@@ -48,6 +48,7 @@ pub fn tycases1(ts: TokenStream) -> TokenStream {
let s = format!("{}::{}{} => {},", enum_1_pre, id, enum_1_suff, rhs);
arms.push(s);
}
arms.push(format!("{}::{}{} => {}", enum_1_pre, "String", enum_1_suff, "todo!()"));
let gen = format!("match {} {{\n{}\n}}", match_val, arms.join("\n"));
//panic!("GENERATED: {}", gen);
gen.parse().unwrap()

View File

@@ -54,6 +54,7 @@ pub enum ScalarType {
F32,
F64,
BOOL,
STRING,
}
pub trait HasScalarType {
@@ -76,8 +77,9 @@ impl ScalarType {
9 => I64,
11 => F32,
12 => F64,
13 => STRING,
//13 => return Err(Error::with_msg(format!("STRING not supported"))),
6 => return Err(Error::with_msg(format!("CHARACTER not supported"))),
13 => return Err(Error::with_msg(format!("STRING not supported"))),
_ => return Err(Error::with_msg(format!("unknown dtype code: {}", ix))),
};
Ok(g)
@@ -139,6 +141,7 @@ impl ScalarType {
F32 => 4,
F64 => 8,
BOOL => 1,
STRING => 0,
}
}
@@ -156,6 +159,7 @@ impl ScalarType {
F32 => 11,
F64 => 12,
BOOL => 0,
STRING => 13,
}
}
@@ -172,6 +176,7 @@ impl ScalarType {
ScalarType::F32 => "float32",
ScalarType::F64 => "float64",
ScalarType::BOOL => "bool",
ScalarType::STRING => "string",
}
}
}
@@ -1236,11 +1241,11 @@ pub fn channel_from_pairs(pairs: &BTreeMap<String, String>) -> Result<Channel, E
let ret = Channel {
backend: pairs
.get("channelBackend")
.ok_or(Error::with_msg("missing channelBackend"))?
.ok_or(Error::with_public_msg("missing channelBackend"))?
.into(),
name: pairs
.get("channelName")
.ok_or(Error::with_msg("missing channelName"))?
.ok_or(Error::with_public_msg("missing channelName"))?
.into(),
};
Ok(ret)
@@ -1378,8 +1383,8 @@ impl HasTimeout for ChannelConfigQuery {
impl FromUrl for ChannelConfigQuery {
fn from_url(url: &Url) -> Result<Self, Error> {
let pairs = get_url_query_pairs(url);
let beg_date = pairs.get("begDate").ok_or(Error::with_msg("missing begDate"))?;
let end_date = pairs.get("endDate").ok_or(Error::with_msg("missing endDate"))?;
let beg_date = pairs.get("begDate").ok_or(Error::with_public_msg("missing begDate"))?;
let end_date = pairs.get("endDate").ok_or(Error::with_public_msg("missing endDate"))?;
let expand = pairs.get("expand").map(|s| s == "true").unwrap_or(false);
let ret = Self {
channel: channel_from_pairs(&pairs)?,

View File

@@ -297,8 +297,8 @@ pub async fn channel_config(q: &ChannelConfigQuery, node: &Node) -> Result<Chann
let conf = read_local_config(q.channel.clone(), node.clone()).await?;
let entry_res = extract_matching_config_entry(&q.range, &conf)?;
let entry = match entry_res {
MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found")),
MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found")),
MatchingConfigEntry::None => return Err(Error::with_public_msg("no config entry found")),
MatchingConfigEntry::Multiple => return Err(Error::with_public_msg("multiple config entries found")),
MatchingConfigEntry::Entry(entry) => entry,
};
let ret = ChannelConfigResponse {