diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index 8ae3bad..32adcc2 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -9,7 +9,7 @@ use bytes::Bytes; use err::Error; use futures_core::Stream; use futures_util::StreamExt; -use items::numops::{BoolNum, NumOps}; +use items::numops::{BoolNum, NumOps, StringNum}; use items::{ Appendable, Clearable, EventsNodeProcessor, Framable, FrameType, PushableIndex, Sitemty, TimeBinnableType, }; @@ -161,6 +161,7 @@ fn make_num_pipeline( ScalarType::F32 => match_end!(f32, byte_order, shape, agg_kind, query, node_config), ScalarType::F64 => match_end!(f64, byte_order, shape, agg_kind, query, node_config), ScalarType::BOOL => match_end!(BoolNum, byte_order, shape, agg_kind, query, node_config), + ScalarType::STRING => match_end!(StringNum, byte_order, shape, agg_kind, query, node_config), } } diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index 1ee041f..3732254 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -9,7 +9,7 @@ use err::Error; use futures_core::Stream; use futures_util::future::FutureExt; use futures_util::StreamExt; -use items::numops::{BoolNum, NumOps}; +use items::numops::{BoolNum, NumOps, StringNum}; use items::scalarevents::ScalarEvents; use items::streams::{Collectable, Collector}; use items::{ @@ -189,6 +189,7 @@ where ScalarType::F32 => match_end!(f, f32, byte_order, scalar_type, shape, agg_kind, node_config), ScalarType::F64 => match_end!(f, f64, byte_order, scalar_type, shape, agg_kind, node_config), ScalarType::BOOL => match_end!(f, BoolNum, byte_order, scalar_type, shape, agg_kind, node_config), + ScalarType::STRING => match_end!(f, StringNum, byte_order, scalar_type, shape, agg_kind, node_config), } } diff --git a/disk/src/decode.rs b/disk/src/decode.rs index a699976..55cd7c4 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -5,7 +5,7 @@ use err::Error; use futures_core::Stream; use futures_util::StreamExt; use items::eventsitem::EventsItem; -use items::numops::{BoolNum, NumOps}; +use items::numops::{BoolNum, NumOps, StringNum}; use items::plainevents::{PlainEvents, ScalarPlainEvents}; use items::scalarevents::ScalarEvents; use items::waveevents::{WaveEvents, WaveNBinner, WavePlainProc, WaveXBinner}; @@ -48,6 +48,20 @@ impl NumFromBytes for BoolNum { } } +impl NumFromBytes for StringNum { + fn convert(_buf: &[u8], _big_endian: bool) -> StringNum { + netpod::log::error!("TODO NumFromBytes for StringNum"); + todo!() + } +} + +impl NumFromBytes for StringNum { + fn convert(_buf: &[u8], _big_endian: bool) -> StringNum { + netpod::log::error!("TODO NumFromBytes for StringNum"); + todo!() + } +} + macro_rules! impl_num_from_bytes_end { ($nty:ident, $nl:expr, $end:ident, $ec:ident) => { impl NumFromBytes<$nty, $end> for $nty { @@ -402,6 +416,11 @@ impl EventsItemStream { let cont = ScalarEvents::::empty(); ret = Some(EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::I8(cont)))); } + ScalarType::STRING => { + // TODO + let cont = ScalarEvents::::empty(); + ret = Some(EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::String(cont)))); + } }, Shape::Wave(_) => todo!(), Shape::Image(..) => todo!(), @@ -444,6 +463,7 @@ impl EventsItemStream { } } ScalarType::BOOL => todo!(), + ScalarType::STRING => todo!(), }, Shape::Wave(_) => todo!(), Shape::Image(_, _) => todo!(), diff --git a/disk/src/events.rs b/disk/src/events.rs index f65a075..dc52489 100644 --- a/disk/src/events.rs +++ b/disk/src/events.rs @@ -126,8 +126,8 @@ impl PlainEventsJsonQuery { pub fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); - let beg_date = pairs.get("begDate").ok_or(Error::with_msg("missing begDate"))?; - let end_date = pairs.get("endDate").ok_or(Error::with_msg("missing endDate"))?; + let beg_date = pairs.get("begDate").ok_or(Error::with_public_msg("missing begDate"))?; + let end_date = pairs.get("endDate").ok_or(Error::with_public_msg("missing endDate"))?; let ret = Self { range: NanoRange { beg: beg_date.parse::>()?.to_nanos(), @@ -138,23 +138,23 @@ impl PlainEventsJsonQuery { .get("diskIoBufferSize") .map_or("4096", |k| k) .parse() - .map_err(|e| Error::with_msg(format!("can not parse diskIoBufferSize {:?}", e)))?, + .map_err(|e| Error::with_public_msg(format!("can not parse diskIoBufferSize {:?}", e)))?, report_error: pairs .get("reportError") .map_or("false", |k| k) .parse() - .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, + .map_err(|e| Error::with_public_msg(format!("can not parse reportError {:?}", e)))?, timeout: pairs .get("timeout") .map_or("10000", |k| k) .parse::() .map(|k| Duration::from_millis(k)) - .map_err(|e| Error::with_msg(format!("can not parse timeout {:?}", e)))?, + .map_err(|e| Error::with_public_msg(format!("can not parse timeout {:?}", e)))?, do_log: pairs .get("doLog") .map_or("false", |k| k) .parse() - .map_err(|e| Error::with_msg(format!("can not parse doLog {:?}", e)))?, + .map_err(|e| Error::with_public_msg(format!("can not parse doLog {:?}", e)))?, }; Ok(ret) } diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 64b4d69..b67f920 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -7,7 +7,7 @@ use crate::eventchunker::{EventChunkerConf, EventFull}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; -use items::numops::{BoolNum, NumOps}; +use items::numops::{BoolNum, NumOps, StringNum}; use items::{EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, StreamItem}; use netpod::query::RawEventsQuery; use netpod::{AggKind, ByteOrder, ByteSize, Channel, FileIoBufferSize, NanoRange, NodeConfigCached, ScalarType, Shape}; @@ -133,6 +133,7 @@ macro_rules! pipe1 { ScalarType::F32 => pipe2!(f32, $end, $shape, $agg_kind, $event_blobs), ScalarType::F64 => pipe2!(f64, $end, $shape, $agg_kind, $event_blobs), ScalarType::BOOL => pipe2!(BoolNum, $end, $shape, $agg_kind, $event_blobs), + ScalarType::STRING => pipe2!(StringNum, $end, $shape, $agg_kind, $event_blobs), } }; } @@ -164,8 +165,8 @@ pub async fn make_event_pipe( Err(e) => return Err(e)?, }; let entry = match entry_res { - MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found"))?, - MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found"))?, + MatchingConfigEntry::None => return Err(Error::with_public_msg("no config entry found"))?, + MatchingConfigEntry::Multiple => return Err(Error::with_public_msg("multiple config entries found"))?, MatchingConfigEntry::Entry(entry) => entry, }; let shape = match entry.to_shape() { @@ -215,8 +216,8 @@ pub async fn get_applicable_entry( Err(e) => return Err(e)?, }; let entry = match entry_res { - MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found"))?, - MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found"))?, + MatchingConfigEntry::None => return Err(Error::with_public_msg("no config entry found"))?, + MatchingConfigEntry::Multiple => return Err(Error::with_public_msg("multiple config entries found"))?, MatchingConfigEntry::Entry(entry) => entry, }; Ok(entry.clone()) diff --git a/err/src/lib.rs b/err/src/lib.rs index 2c238de..3920986 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -51,6 +51,20 @@ impl Error { } } + pub fn with_public_msg>(s: S) -> Self { + let s = s.into(); + let ret = Self::with_msg(&s); + let ret = ret.add_public_msg(s); + ret + } + + pub fn with_public_msg_no_trace>(s: S) -> Self { + let s = s.into(); + let ret = Self::with_msg_no_trace(&s); + let ret = ret.add_public_msg(s); + ret + } + pub fn from_string(e: E) -> Self where E: ToString, @@ -94,7 +108,7 @@ fn fmt_backtrace(trace: &backtrace::Backtrace) -> String { None => false, Some(s) => { let s = s.to_str().unwrap(); - s.contains("/dev/daqbuffer/") || s.contains("/data_meta/build/") + true || s.contains("/dev/daqbuffer/") || s.contains("/data_meta/build/") } }; let name = match sy.name() { @@ -113,7 +127,7 @@ fn fmt_backtrace(trace: &backtrace::Backtrace) -> String { _ => 0, }; if is_ours { - write!(&mut buf, "\n {}\n {} {}", name, filename, lineno).unwrap(); + write!(&mut buf, "\n {name}\n {filename} {lineno}").unwrap(); c1 += 1; if c1 >= 10 { break 'outer; diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index acc46f2..6362f7c 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -678,9 +678,11 @@ impl Stream for DataApiPython3DataStream { Err(e) => return Err(e)?, }; let entry = match entry_res { - MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found"))?, + MatchingConfigEntry::None => { + return Err(Error::with_public_msg("no config entry found"))? + } MatchingConfigEntry::Multiple => { - return Err(Error::with_msg("multiple config entries found"))? + return Err(Error::with_public_msg("multiple config entries found"))? } MatchingConfigEntry::Entry(entry) => entry.clone(), }; diff --git a/httpret/src/err.rs b/httpret/src/err.rs index 1239b10..34f1b09 100644 --- a/httpret/src/err.rs +++ b/httpret/src/err.rs @@ -12,6 +12,14 @@ impl Error { Self(::err::Error::with_msg_no_trace(s)) } + pub fn with_public_msg>(s: S) -> Self { + Self(::err::Error::with_public_msg(s)) + } + + pub fn with_public_msg_no_trace>(s: S) -> Self { + Self(::err::Error::with_public_msg_no_trace(s)) + } + pub fn msg(&self) -> &str { self.0.msg() } @@ -27,7 +35,7 @@ impl Error { impl fmt::Display for Error { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt::Debug::fmt(self, fmt) + fmt::Display::fmt(&self.0, fmt) } } diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index cc866a8..5362c91 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -21,10 +21,10 @@ use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; use hyper::{server::Server, Body, Request, Response}; use net::SocketAddr; -use netpod::log::*; use netpod::query::BinnedQuery; use netpod::timeunits::SEC; use netpod::{channel_from_pairs, get_url_query_pairs, AggKind, ChannelConfigQuery, FromUrl, NodeConfigCached}; +use netpod::{log::*, ACCEPT_ALL}; use netpod::{APP_JSON, APP_JSON_LINES, APP_OCTET}; use nodenet::conn::events_service; use panic::{AssertUnwindSafe, UnwindSafe}; @@ -81,7 +81,7 @@ async fn http_service(req: Request, node_config: NodeConfigCached) -> Resu match http_service_try(req, &node_config).await { Ok(k) => Ok(k), Err(e) => { - error!("daqbuffer node http_service sees error: {:?}", e); + error!("daqbuffer node http_service sees error: {}", e); Err(e) } } @@ -448,30 +448,34 @@ trait ToPublicResponse { impl ToPublicResponse for Error { fn to_public_response(&self) -> Response { + use std::fmt::Write; let status = match self.reason() { Some(::err::Reason::BadRequest) => StatusCode::BAD_REQUEST, Some(::err::Reason::InternalError) => StatusCode::INTERNAL_SERVER_ERROR, _ => StatusCode::INTERNAL_SERVER_ERROR, }; - let msg = match self.public_msg() { + let mut msg = match self.public_msg() { Some(v) => v.join("\n"), _ => String::new(), }; + write!(msg, "\n\nhttps://data-api.psi.ch/api/4/documentation\n").unwrap(); response(status).body(Body::from(msg)).unwrap() } } impl ToPublicResponse for ::err::Error { fn to_public_response(&self) -> Response { + use std::fmt::Write; let status = match self.reason() { Some(::err::Reason::BadRequest) => StatusCode::BAD_REQUEST, Some(::err::Reason::InternalError) => StatusCode::INTERNAL_SERVER_ERROR, _ => StatusCode::INTERNAL_SERVER_ERROR, }; - let msg = match self.public_msg() { + let mut msg = match self.public_msg() { Some(v) => v.join("\n"), _ => String::new(), }; + write!(msg, "\n\nhttps://data-api.psi.ch/api/4/documentation\n").unwrap(); response(status).body(Body::from(msg)).unwrap() } } @@ -494,7 +498,7 @@ async fn binned_inner(req: Request, node_config: &NodeConfigCached) -> Res }); match head.headers.get(http::header::ACCEPT) { Some(v) if v == APP_OCTET => binned_binary(query, node_config).await, - Some(v) if v == APP_JSON => binned_json(query, node_config).await, + Some(v) if v == APP_JSON || v == ACCEPT_ALL => binned_json(query, node_config).await, _ => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?), } } @@ -573,13 +577,13 @@ async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Res } async fn plain_events_inner(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - debug!("httpret plain_events_inner headers: {:?}", req.headers()); + info!("httpret plain_events_inner req: {:?}", req); let accept_def = APP_JSON; let accept = req .headers() .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); - if accept == APP_JSON { + if accept == APP_JSON || accept == ACCEPT_ALL { Ok(plain_events_json(req, node_config).await?) } else if accept == APP_OCTET { Ok(plain_events_binary(req, node_config).await?) diff --git a/items/src/binnedevents.rs b/items/src/binnedevents.rs index bcf45af..7694e9d 100644 --- a/items/src/binnedevents.rs +++ b/items/src/binnedevents.rs @@ -18,6 +18,7 @@ pub enum SingleBinWaveEvents { I64(XBinnedScalarEvents), F32(XBinnedScalarEvents), F64(XBinnedScalarEvents), + String(XBinnedScalarEvents), } impl SingleBinWaveEvents { @@ -108,6 +109,7 @@ pub enum MultiBinWaveEvents { I64(XBinnedWaveEvents), F32(XBinnedWaveEvents), F64(XBinnedWaveEvents), + String(XBinnedWaveEvents), } impl MultiBinWaveEvents { diff --git a/items/src/lib.rs b/items/src/lib.rs index 1c9d7fb..471cde3 100644 --- a/items/src/lib.rs +++ b/items/src/lib.rs @@ -22,6 +22,7 @@ use err::Error; use netpod::timeunits::{MS, SEC}; use netpod::{log::Level, AggKind, EventDataReadStats, EventQueryJsonStringFrame, NanoRange, Shape}; use netpod::{DiskStats, RangeFilterStats}; +use numops::StringNum; use serde::de::{self, DeserializeOwned, Visitor}; use serde::{Deserialize, Serialize, Serializer}; use std::fmt; @@ -199,10 +200,14 @@ impl SubFrId for f64 { const SUB: u32 = 12; } -impl SubFrId for BoolNum { +impl SubFrId for StringNum { const SUB: u32 = 13; } +impl SubFrId for BoolNum { + const SUB: u32 = 14; +} + pub trait SitemtyFrameType { const FRAME_TYPE_ID: u32; } diff --git a/items/src/minmaxavgbins.rs b/items/src/minmaxavgbins.rs index dd46d6f..33a6786 100644 --- a/items/src/minmaxavgbins.rs +++ b/items/src/minmaxavgbins.rs @@ -364,28 +364,28 @@ where if item.ts2s[i1] <= self.range.beg { } else if item.ts1s[i1] >= self.range.end { } else { - self.min = match self.min { - None => item.mins[i1], - Some(min) => match item.mins[i1] { - None => Some(min), + self.min = match &self.min { + None => item.mins[i1].clone(), + Some(min) => match &item.mins[i1] { + None => Some(min.clone()), Some(v) => { - if v < min { - Some(v) + if v < &min { + Some(v.clone()) } else { - Some(min) + Some(min.clone()) } } }, }; - self.max = match self.max { - None => item.maxs[i1], - Some(max) => match item.maxs[i1] { - None => Some(max), + self.max = match &self.max { + None => item.maxs[i1].clone(), + Some(max) => match &item.maxs[i1] { + None => Some(max.clone()), Some(v) => { - if v > max { - Some(v) + if v > &max { + Some(v.clone()) } else { - Some(max) + Some(max.clone()) } } }, @@ -415,8 +415,8 @@ where ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], - mins: vec![self.min], - maxs: vec![self.max], + mins: vec![self.min.clone()], + maxs: vec![self.max.clone()], avgs: vec![avg], }; self.count = 0; diff --git a/items/src/minmaxavgdim1bins.rs b/items/src/minmaxavgdim1bins.rs index 82d094e..42bf309 100644 --- a/items/src/minmaxavgdim1bins.rs +++ b/items/src/minmaxavgdim1bins.rs @@ -369,8 +369,8 @@ where None => {} Some(v) => { for (a, b) in min.iter_mut().zip(v.iter()) { - if *b < *a { - *a = *b; + if b < a { + *a = b.clone(); } } } @@ -382,8 +382,8 @@ where None => {} Some(v) => { for (a, b) in max.iter_mut().zip(v.iter()) { - if *b > *a { - *a = *b; + if b > a { + *a = b.clone(); } } } diff --git a/items/src/minmaxavgwavebins.rs b/items/src/minmaxavgwavebins.rs index 15de521..52c1974 100644 --- a/items/src/minmaxavgwavebins.rs +++ b/items/src/minmaxavgwavebins.rs @@ -376,7 +376,7 @@ where Some(inp) => { for (a, b) in self.min.iter_mut().zip(inp.iter()) { if *b < *a || a.is_nan() { - *a = *b; + *a = b.clone(); } } } @@ -386,7 +386,7 @@ where Some(inp) => { for (a, b) in self.max.iter_mut().zip(inp.iter()) { if *b > *a || a.is_nan() { - *a = *b; + *a = b.clone(); } } } diff --git a/items/src/numops.rs b/items/src/numops.rs index 6be3913..8123ba3 100644 --- a/items/src/numops.rs +++ b/items/src/numops.rs @@ -1,5 +1,5 @@ use crate::SubFrId; -use num_traits::{AsPrimitive, Bounded, Float, Zero}; +use num_traits::{Bounded, Float, Zero}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use std::cmp::Ordering; @@ -60,14 +60,64 @@ impl PartialOrd for BoolNum { } } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct StringNum(pub String); + +impl StringNum { + pub const MIN: Self = Self(String::new()); + pub const MAX: Self = Self(String::new()); +} + +impl Add for StringNum { + type Output = StringNum; + + fn add(self, _rhs: StringNum) -> Self::Output { + todo!() + } +} + +impl num_traits::Zero for StringNum { + fn zero() -> Self { + Self(String::new()) + } + + fn is_zero(&self) -> bool { + self.0.is_empty() + } +} + +impl num_traits::Bounded for StringNum { + fn min_value() -> Self { + Self(String::new()) + } + + fn max_value() -> Self { + Self(String::new()) + } +} + +impl PartialEq for StringNum { + fn eq(&self, other: &Self) -> bool { + PartialEq::eq(&self.0, &other.0) + } +} + +impl PartialOrd for StringNum { + fn partial_cmp(&self, other: &Self) -> Option { + PartialOrd::partial_cmp(&self.0, &other.0) + } +} + pub trait NumOps: Sized - + Copy + //+ Copy + + Clone + + AsPrimF32 + Send + Unpin + Debug + Zero - + AsPrimitive + //+ AsPrimitive + Bounded + PartialOrd + SubFrId @@ -103,6 +153,44 @@ fn is_nan_float(x: &T) -> bool { x.is_nan() } +pub trait AsPrimF32 { + fn as_prim_f32(&self) -> f32; +} + +macro_rules! impl_as_prim_f32 { + ($ty:ident) => { + impl AsPrimF32 for $ty { + fn as_prim_f32(&self) -> f32 { + *self as f32 + } + } + }; +} + +impl_as_prim_f32!(u8); +impl_as_prim_f32!(u16); +impl_as_prim_f32!(u32); +impl_as_prim_f32!(u64); +impl_as_prim_f32!(i8); +impl_as_prim_f32!(i16); +impl_as_prim_f32!(i32); +impl_as_prim_f32!(i64); +impl_as_prim_f32!(f32); +impl_as_prim_f32!(f64); + +impl AsPrimF32 for BoolNum { + fn as_prim_f32(&self) -> f32 { + self.0 as f32 + } +} + +impl AsPrimF32 for StringNum { + fn as_prim_f32(&self) -> f32 { + netpod::log::error!("TODO impl AsPrimF32 for StringNum"); + todo!() + } +} + impl_num_ops!(u8, MIN, MAX, is_nan_int); impl_num_ops!(u16, MIN, MAX, is_nan_int); impl_num_ops!(u32, MIN, MAX, is_nan_int); @@ -114,3 +202,4 @@ impl_num_ops!(i64, MIN, MAX, is_nan_int); impl_num_ops!(f32, NAN, NAN, is_nan_float); impl_num_ops!(f64, NAN, NAN, is_nan_float); impl_num_ops!(BoolNum, MIN, MAX, is_nan_int); +impl_num_ops!(StringNum, MIN, MAX, is_nan_int); diff --git a/items/src/plainevents.rs b/items/src/plainevents.rs index 012f9bc..169eec6 100644 --- a/items/src/plainevents.rs +++ b/items/src/plainevents.rs @@ -22,6 +22,7 @@ pub enum ScalarPlainEvents { I64(ScalarEvents), F32(ScalarEvents), F64(ScalarEvents), + String(ScalarEvents), } impl ScalarPlainEvents { @@ -100,6 +101,7 @@ pub enum WavePlainEvents { I64(WaveEvents), F32(WaveEvents), F64(WaveEvents), + String(WaveEvents), } impl WavePlainEvents { diff --git a/items/src/scalarevents.rs b/items/src/scalarevents.rs index 7fe6e1f..eace776 100644 --- a/items/src/scalarevents.rs +++ b/items/src/scalarevents.rs @@ -13,6 +13,8 @@ use serde::{Deserialize, Serialize}; use std::fmt; use tokio::fs::File; +// TODO in this module reduce clones. + // TODO add pulse. #[derive(Serialize, Deserialize)] pub struct ScalarEvents { @@ -147,7 +149,7 @@ where { fn push_index(&mut self, src: &Self, ix: usize) { self.tss.push(src.tss[ix]); - self.values.push(src.values[ix]); + self.values.push(src.values[ix].clone()); } } @@ -316,32 +318,33 @@ where } } + // TODO reduce clone.. optimize via more traits to factor the trade-offs? fn apply_min_max(&mut self, val: NTY) { - self.min = match self.min { - None => Some(val), + self.min = match &self.min { + None => Some(val.clone()), Some(min) => { - if val < min { - Some(val) + if &val < min { + Some(val.clone()) } else { - Some(min) + Some(min.clone()) } } }; - self.max = match self.max { + self.max = match &self.max { None => Some(val), Some(max) => { - if val > max { + if &val > max { Some(val) } else { - Some(max) + Some(max.clone()) } } }; } fn apply_event_unweight(&mut self, val: NTY) { + let vf = val.as_prim_f32(); self.apply_min_max(val); - let vf = val.as_(); if vf.is_nan() { } else { self.sum += vf; @@ -350,15 +353,15 @@ where } fn apply_event_time_weight(&mut self, ts: u64) { - if let Some(v) = self.last_val { - debug!("apply_event_time_weight"); - self.apply_min_max(v); + if let Some(v) = &self.last_val { + let vf = v.as_prim_f32(); + let v2 = v.clone(); + self.apply_min_max(v2); let w = if self.do_time_weight { (ts - self.int_ts) as f32 * 1e-9 } else { 1. }; - let vf = v.as_(); if vf.is_nan() { } else { self.sum += vf * w; @@ -376,7 +379,7 @@ where fn ingest_unweight(&mut self, item: &::Input) { for i1 in 0..item.tss.len() { let ts = item.tss[i1]; - let val = item.values[i1]; + let val = item.values[i1].clone(); if ts < self.range.beg { } else if ts >= self.range.end { } else { @@ -389,7 +392,7 @@ where fn ingest_time_weight(&mut self, item: &::Input) { for i1 in 0..item.tss.len() { let ts = item.tss[i1]; - let val = item.values[i1]; + let val = item.values[i1].clone(); if ts < self.int_ts { debug!("just set int_ts"); self.last_ts = ts; @@ -417,8 +420,8 @@ where ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], - mins: vec![self.min], - maxs: vec![self.max], + mins: vec![self.min.clone()], + maxs: vec![self.max.clone()], avgs: vec![avg], }; self.int_ts = range.beg; @@ -447,8 +450,8 @@ where ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], - mins: vec![self.min], - maxs: vec![self.max], + mins: vec![self.min.clone()], + maxs: vec![self.max.clone()], avgs: vec![avg], }; self.int_ts = range.beg; diff --git a/items/src/waveevents.rs b/items/src/waveevents.rs index 3c54879..b3d6d5d 100644 --- a/items/src/waveevents.rs +++ b/items/src/waveevents.rs @@ -246,7 +246,7 @@ where Some(min) => { for (a, b) in min.iter_mut().zip(item.vals[i1].iter()) { if b < a { - *a = *b; + *a = b.clone(); } } } @@ -256,18 +256,18 @@ where Some(max) => { for (a, b) in max.iter_mut().zip(item.vals[i1].iter()) { if b < a { - *a = *b; + *a = b.clone(); } } } }; match self.sum.as_mut() { None => { - self.sum = Some(item.vals[i1].iter().map(|k| k.as_()).collect()); + self.sum = Some(item.vals[i1].iter().map(|k| k.as_prim_f32()).collect()); } Some(sum) => { for (a, b) in sum.iter_mut().zip(item.vals[i1].iter()) { - let vf = b.as_(); + let vf = b.as_prim_f32(); if vf.is_nan() { } else { *a += vf; @@ -356,14 +356,14 @@ where let mut sum = 0f32; let mut sumc = 0; let vals = &inp.vals[i1]; - for &v in vals { - if v < min || min.is_nan() { - min = v; + for v in vals.iter() { + if v < &min || min.is_nan() { + min = v.clone(); } - if v > max || max.is_nan() { - max = v; + if v > &max || max.is_nan() { + max = v.clone(); } - let vf = v.as_(); + let vf = v.as_prim_f32(); if vf.is_nan() { } else { sum += vf; @@ -420,17 +420,17 @@ where let mut max = vec![NTY::min_or_nan(); self.x_bin_count]; let mut sum = vec![0f32; self.x_bin_count]; let mut sumc = vec![0u64; self.x_bin_count]; - for (i2, &v) in inp.vals[i1].iter().enumerate() { + for (i2, v) in inp.vals[i1].iter().enumerate() { let i3 = i2 * self.x_bin_count / self.shape_bin_count; - if v < min[i3] || min[i3].is_nan() { - min[i3] = v; + if v < &min[i3] || min[i3].is_nan() { + min[i3] = v.clone(); } - if v > max[i3] || max[i3].is_nan() { - max[i3] = v; + if v > &max[i3] || max[i3].is_nan() { + max[i3] = v.clone(); } if v.is_nan() { } else { - sum[i3] += v.as_(); + sum[i3] += v.as_prim_f32(); sumc[i3] += 1; } } diff --git a/items/src/xbinnedscalarevents.rs b/items/src/xbinnedscalarevents.rs index 52ce8ec..709412b 100644 --- a/items/src/xbinnedscalarevents.rs +++ b/items/src/xbinnedscalarevents.rs @@ -12,6 +12,8 @@ use netpod::NanoRange; use serde::{Deserialize, Serialize}; use tokio::fs::File; +// TODO in this module reduce clones + // TODO rename Scalar -> Dim0 #[derive(Debug, Serialize, Deserialize)] pub struct XBinnedScalarEvents { @@ -124,8 +126,8 @@ where { fn push_index(&mut self, src: &Self, ix: usize) { self.tss.push(src.tss[ix]); - self.mins.push(src.mins[ix]); - self.maxs.push(src.maxs[ix]); + self.mins.push(src.mins[ix].clone()); + self.maxs.push(src.maxs[ix].clone()); self.avgs.push(src.avgs[ix]); } } @@ -225,23 +227,23 @@ where } fn apply_min_max(&mut self, min: NTY, max: NTY) { - self.min = match self.min { + self.min = match &self.min { None => Some(min), Some(cmin) => { - if min < cmin { + if &min < cmin { Some(min) } else { - Some(cmin) + Some(cmin.clone()) } } }; - self.max = match self.max { + self.max = match &self.max { None => Some(max), Some(cmax) => { - if max > cmax { + if &max > cmax { Some(max) } else { - Some(cmax) + Some(cmax.clone()) } } }; @@ -260,8 +262,10 @@ where fn apply_event_time_weight(&mut self, ts: u64) { //debug!("apply_event_time_weight"); - if let (Some(avg), Some(min), Some(max)) = (self.last_avg, self.last_min, self.last_max) { - self.apply_min_max(min, max); + if let (Some(avg), Some(min), Some(max)) = (self.last_avg, &self.last_min, &self.last_max) { + let min2 = min.clone(); + let max2 = max.clone(); + self.apply_min_max(min2, max2); let w = (ts - self.int_ts) as f32 / self.range.delta() as f32; if avg.is_nan() { } else { @@ -276,8 +280,8 @@ where for i1 in 0..item.tss.len() { let ts = item.tss[i1]; let avg = item.avgs[i1]; - let min = item.mins[i1]; - let max = item.maxs[i1]; + let min = item.mins[i1].clone(); + let max = item.maxs[i1].clone(); if ts < self.range.beg { } else if ts >= self.range.end { } else { @@ -291,8 +295,8 @@ where for i1 in 0..item.tss.len() { let ts = item.tss[i1]; let avg = item.avgs[i1]; - let min = item.mins[i1]; - let max = item.maxs[i1]; + let min = item.mins[i1].clone(); + let max = item.maxs[i1].clone(); if ts < self.int_ts { self.last_ts = ts; self.last_avg = Some(avg); @@ -321,8 +325,8 @@ where ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], - mins: vec![self.min], - maxs: vec![self.max], + mins: vec![self.min.clone()], + maxs: vec![self.max.clone()], avgs: vec![avg], }; self.int_ts = range.beg; @@ -348,8 +352,8 @@ where ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], - mins: vec![self.min], - maxs: vec![self.max], + mins: vec![self.min.clone()], + maxs: vec![self.max.clone()], avgs: vec![avg], }; self.int_ts = range.beg; diff --git a/items/src/xbinnedwaveevents.rs b/items/src/xbinnedwaveevents.rs index 2891687..d60b4c1 100644 --- a/items/src/xbinnedwaveevents.rs +++ b/items/src/xbinnedwaveevents.rs @@ -228,6 +228,7 @@ where } } + // TODO get rid of clones. fn apply_min_max(&mut self, min: &Vec, max: &Vec) { self.min = match self.min.take() { None => Some(min.clone()), @@ -235,7 +236,7 @@ where let a = cmin .into_iter() .zip(min) - .map(|(a, b)| if a < *b { a } else { *b }) + .map(|(a, b)| if a < *b { a } else { b.clone() }) .collect(); Some(a) } @@ -246,7 +247,7 @@ where let a = cmax .into_iter() .zip(min) - .map(|(a, b)| if a > *b { a } else { *b }) + .map(|(a, b)| if a > *b { a } else { b.clone() }) .collect(); Some(a) } diff --git a/items_proc/src/items_proc.rs b/items_proc/src/items_proc.rs index 5d57ea3..2ce8331 100644 --- a/items_proc/src/items_proc.rs +++ b/items_proc/src/items_proc.rs @@ -48,6 +48,7 @@ pub fn tycases1(ts: TokenStream) -> TokenStream { let s = format!("{}::{}{} => {},", enum_1_pre, id, enum_1_suff, rhs); arms.push(s); } + arms.push(format!("{}::{}{} => {}", enum_1_pre, "String", enum_1_suff, "todo!()")); let gen = format!("match {} {{\n{}\n}}", match_val, arms.join("\n")); //panic!("GENERATED: {}", gen); gen.parse().unwrap() diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 9325110..f32cfa8 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -54,6 +54,7 @@ pub enum ScalarType { F32, F64, BOOL, + STRING, } pub trait HasScalarType { @@ -76,8 +77,9 @@ impl ScalarType { 9 => I64, 11 => F32, 12 => F64, + 13 => STRING, + //13 => return Err(Error::with_msg(format!("STRING not supported"))), 6 => return Err(Error::with_msg(format!("CHARACTER not supported"))), - 13 => return Err(Error::with_msg(format!("STRING not supported"))), _ => return Err(Error::with_msg(format!("unknown dtype code: {}", ix))), }; Ok(g) @@ -139,6 +141,7 @@ impl ScalarType { F32 => 4, F64 => 8, BOOL => 1, + STRING => 0, } } @@ -156,6 +159,7 @@ impl ScalarType { F32 => 11, F64 => 12, BOOL => 0, + STRING => 13, } } @@ -172,6 +176,7 @@ impl ScalarType { ScalarType::F32 => "float32", ScalarType::F64 => "float64", ScalarType::BOOL => "bool", + ScalarType::STRING => "string", } } } @@ -1236,11 +1241,11 @@ pub fn channel_from_pairs(pairs: &BTreeMap) -> Result Result { let pairs = get_url_query_pairs(url); - let beg_date = pairs.get("begDate").ok_or(Error::with_msg("missing begDate"))?; - let end_date = pairs.get("endDate").ok_or(Error::with_msg("missing endDate"))?; + let beg_date = pairs.get("begDate").ok_or(Error::with_public_msg("missing begDate"))?; + let end_date = pairs.get("endDate").ok_or(Error::with_public_msg("missing endDate"))?; let expand = pairs.get("expand").map(|s| s == "true").unwrap_or(false); let ret = Self { channel: channel_from_pairs(&pairs)?, diff --git a/parse/src/channelconfig.rs b/parse/src/channelconfig.rs index 9903425..e7036cf 100644 --- a/parse/src/channelconfig.rs +++ b/parse/src/channelconfig.rs @@ -297,8 +297,8 @@ pub async fn channel_config(q: &ChannelConfigQuery, node: &Node) -> Result return Err(Error::with_msg("no config entry found")), - MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found")), + MatchingConfigEntry::None => return Err(Error::with_public_msg("no config entry found")), + MatchingConfigEntry::Multiple => return Err(Error::with_public_msg("multiple config entries found")), MatchingConfigEntry::Entry(entry) => entry, }; let ret = ChannelConfigResponse {