From 6b5c245319bc4bf388bd338817aae5d86bb94318 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 12 Nov 2021 12:15:21 +0100 Subject: [PATCH] Process waveform on event fetch --- archapp/src/archeng/indexfiles.rs | 31 +-- archapp/src/archeng/pipe.rs | 117 +++++++++- daqbufp2/src/test/binnedjson.rs | 129 ++++++++++- disk/src/binned/pbv.rs | 9 +- err/src/lib.rs | 48 +++- httpret/src/httpret.rs | 30 ++- httpret/src/proxy.rs | 13 ++ httpret/static/documentation/api4.html | 297 +++++++++++++------------ httpret/static/documentation/style.css | 10 +- items/src/eventvalues.rs | 4 - items/src/xbinnedscalarevents.rs | 4 +- netpod/src/netpod.rs | 29 +++ taskrun/src/taskrun.rs | 20 +- 13 files changed, 539 insertions(+), 202 deletions(-) diff --git a/archapp/src/archeng/indexfiles.rs b/archapp/src/archeng/indexfiles.rs index c3cac64..be72feb 100644 --- a/archapp/src/archeng/indexfiles.rs +++ b/archapp/src/archeng/indexfiles.rs @@ -529,9 +529,22 @@ fn categorize_index_files(list: &Vec) -> Result, Error> { } } +pub async fn index_file_path_list(channel: Channel, dbconf: Database) -> Result, Error> { + let dbc = database_connect(&dbconf).await?; + let sql = "select i.path from indexfiles i, channels c, channel_index_map m where c.name = $1 and m.channel = c.rowid and i.rowid = m.index"; + let rows = dbc.query(sql, &[&channel.name()]).await?; + let mut index_paths = vec![]; + for row in rows { + index_paths.push(row.try_get(0)?); + } + let list = categorize_index_files(&index_paths)?; + let ret = list.into_iter().map(|k| k.path).collect(); + Ok(ret) +} + static INDEX_JSON: Mutex>>> = Mutex::const_new(None); -pub async fn index_files_index_ref + Send>( +async fn index_files_index_ref + Send>( key: &str, index_files_index_path: P, stats: &StatsChannel, @@ -572,21 +585,9 @@ pub async fn index_files_index_ref + Send>( } } -pub async fn index_file_path_list(channel: Channel, dbconf: Database) -> Result, Error> { - let dbc = database_connect(&dbconf).await?; - let sql = "select i.path from indexfiles i, channels c, channel_index_map m where c.name = $1 and m.channel = c.rowid and i.rowid = m.index"; - let rows = dbc.query(sql, &[&channel.name()]).await?; - let mut index_paths = vec![]; - for row in rows { - index_paths.push(row.try_get(0)?); - } - let list = categorize_index_files(&index_paths)?; - let ret = list.into_iter().map(|k| k.path).collect(); - Ok(ret) -} - // TODO using the json index is currently no longer needed, but maybe as alternative for tests. -async fn _index_file_path_list_old( +#[allow(unused)] +async fn index_file_path_list_old( channel: Channel, index_files_index_path: PathBuf, stats: &StatsChannel, diff --git a/archapp/src/archeng/pipe.rs b/archapp/src/archeng/pipe.rs index 51f2fa2..d42596b 100644 --- a/archapp/src/archeng/pipe.rs +++ b/archapp/src/archeng/pipe.rs @@ -3,10 +3,11 @@ use crate::archeng::blockstream::BlockStream; use crate::events::{FrameMaker, FrameMakerTrait}; use err::Error; use futures_util::{Stream, StreamExt}; -use items::binnedevents::XBinnedEvents; +use items::binnedevents::{SingleBinWaveEvents, XBinnedEvents}; use items::eventsitem::EventsItem; -use items::plainevents::PlainEvents; -use items::{Framable, LogItem, RangeCompletableItem, StreamItem}; +use items::plainevents::{PlainEvents, WavePlainEvents}; +use items::waveevents::WaveXBinner; +use items::{EventsNodeProcessor, Framable, LogItem, RangeCompletableItem, StreamItem}; use netpod::query::RawEventsQuery; use netpod::{log::*, AggKind, Shape}; use netpod::{ChannelArchiver, ChannelConfigQuery}; @@ -28,8 +29,14 @@ pub async fn make_event_pipe( }; debug!("Channel config: {:?}", channel_config); let ixpaths = crate::archeng::indexfiles::index_file_path_list(evq.channel.clone(), conf.database.clone()).await?; - info!("got categorized ixpaths: {:?}", ixpaths); - let ixpath = ixpaths.first().unwrap().clone(); + debug!("got categorized ixpaths: {:?}", ixpaths); + let ixpath = if let Some(x) = ixpaths.first() { + x.clone() + } else { + return Err(Error::with_msg_no_trace("no index file for channel") + .mark_bad_request() + .add_public_msg(format!("No index file for {}", evq.channel.name))); + }; use crate::archeng::blockstream::BlockItem; let refs = blockref_stream( evq.channel.clone(), @@ -45,6 +52,8 @@ pub async fn make_event_pipe( }, Err(e) => Err(e), }); + let cfgshape = channel_config.shape.clone(); + let q_agg_kind = evq.agg_kind.clone(); let filtered = RangeFilter::new(blocks, evq.range.clone(), evq.agg_kind.need_expand()); let xtrans = match channel_config.shape { Shape::Scalar => match evq.agg_kind { @@ -78,9 +87,105 @@ pub async fn make_event_pipe( AggKind::DimXBinsN(_) => err::todoval(), AggKind::EventBlobs => err::todoval(), }, + Shape::Wave(_n1) => match evq.agg_kind { + AggKind::Plain => Box::pin(filtered) as Pin + Send>>, + AggKind::TimeWeightedScalar | AggKind::DimXBins1 => { + let tr = filtered.map(move |j| match j { + Ok(j) => match j { + StreamItem::DataItem(j) => match j { + RangeCompletableItem::RangeComplete => { + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) + } + RangeCompletableItem::Data(j) => match j { + EventsItem::Plain(j) => match j { + PlainEvents::Scalar(_) => { + warn!("EventsItem::Plain Scalar for {:?} {:?}", cfgshape, q_agg_kind); + panic!() + } + PlainEvents::Wave(j) => { + trace!("EventsItem::Plain Wave for {:?} {:?}", cfgshape, q_agg_kind); + match j { + WavePlainEvents::Byte(j) => { + let binner = + WaveXBinner::::create(cfgshape.clone(), q_agg_kind.clone()); + let out = binner.process(j); + let item = SingleBinWaveEvents::Byte(out); + let item = XBinnedEvents::SingleBinWave(item); + let item = EventsItem::XBinnedEvents(item); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + } + WavePlainEvents::Short(j) => { + let binner = + WaveXBinner::::create(cfgshape.clone(), q_agg_kind.clone()); + let out = binner.process(j); + let item = SingleBinWaveEvents::Short(out); + let item = XBinnedEvents::SingleBinWave(item); + let item = EventsItem::XBinnedEvents(item); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + } + WavePlainEvents::Int(j) => { + let binner = + WaveXBinner::::create(cfgshape.clone(), q_agg_kind.clone()); + let out = binner.process(j); + let item = SingleBinWaveEvents::Int(out); + let item = XBinnedEvents::SingleBinWave(item); + let item = EventsItem::XBinnedEvents(item); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + } + WavePlainEvents::Float(j) => { + let binner = + WaveXBinner::::create(cfgshape.clone(), q_agg_kind.clone()); + let out = binner.process(j); + let item = SingleBinWaveEvents::Float(out); + let item = XBinnedEvents::SingleBinWave(item); + let item = EventsItem::XBinnedEvents(item); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + } + WavePlainEvents::Double(j) => { + let binner = + WaveXBinner::::create(cfgshape.clone(), q_agg_kind.clone()); + let out = binner.process(j); + let item = SingleBinWaveEvents::Double(out); + let item = XBinnedEvents::SingleBinWave(item); + let item = EventsItem::XBinnedEvents(item); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + } + } + } + }, + EventsItem::XBinnedEvents(j) => match j { + XBinnedEvents::Scalar(j) => { + warn!("XBinnedEvents::Scalar for {:?} {:?}", cfgshape, q_agg_kind); + let item = XBinnedEvents::Scalar(j); + let item = EventsItem::XBinnedEvents(item); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + } + XBinnedEvents::SingleBinWave(j) => { + warn!("XBinnedEvents::SingleBinWave for {:?} {:?}", cfgshape, q_agg_kind); + let item = XBinnedEvents::SingleBinWave(j); + let item = EventsItem::XBinnedEvents(item); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + } + XBinnedEvents::MultiBinWave(_) => todo!(), + }, + }, + }, + StreamItem::Log(j) => Ok(StreamItem::Log(j)), + StreamItem::Stats(j) => Ok(StreamItem::Stats(j)), + }, + Err(e) => Err(e), + }); + Box::pin(tr) as _ + } + AggKind::DimXBinsN(_) => err::todoval(), + AggKind::EventBlobs => err::todoval(), + }, _ => { error!("TODO shape {:?}", channel_config.shape); - panic!() + let err = Error::with_msg_no_trace(format!("TODO shape {:?}", channel_config.shape)) + .mark_bad_request() + .add_public_msg(format!("can not yet handle shape {:?}", channel_config.shape)); + Box::pin(futures_util::stream::iter([Err(err)])) } }; let mut frame_maker = Box::new(FrameMaker::with_item_type( diff --git a/daqbufp2/src/test/binnedjson.rs b/daqbufp2/src/test/binnedjson.rs index 2cc1b8e..2711e8f 100644 --- a/daqbufp2/src/test/binnedjson.rs +++ b/daqbufp2/src/test/binnedjson.rs @@ -3,8 +3,9 @@ use chrono::{DateTime, Utc}; use err::Error; use http::StatusCode; use hyper::Body; +use netpod::log::*; use netpod::query::{BinnedQuery, CacheUsage}; -use netpod::{log::*, AppendToUrl}; +use netpod::{f64_close, AppendToUrl}; use netpod::{AggKind, Channel, Cluster, NanoRange, APP_JSON}; use serde::{Deserialize, Serialize}; use std::time::Duration; @@ -85,11 +86,89 @@ fn get_sls_archive_1() -> Result<(), Error> { let begstr = "2021-11-10T01:00:00Z"; let endstr = "2021-11-10T01:01:00Z"; let res = get_binned_json_common_res(channel, begstr, endstr, 10, AggKind::TimeWeightedScalar, cluster).await?; - assert_eq!(res.finalised_range, true); - assert_eq!(res.ts_anchor, 1636506000); - //assert!((res.avgs[3].unwrap() - 1.01470947265625).abs() < 1e-4); - //assert!((res.avgs[4].unwrap() - 24.06792449951172).abs() < 1e-4); - //assert!((res.avgs[11].unwrap() - 0.00274658203125).abs() < 1e-4); + let exp = r##"{"avgs":[24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875],"counts":[0,0,0,0,0,0,0,0,0,0,0,0],"finalisedRange":true,"maxs":[24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875],"mins":[24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875],"tsAnchor":1636506000,"tsMs":[0,5000,10000,15000,20000,25000,30000,35000,40000,45000,50000,55000,60000],"tsNs":[0,0,0,0,0,0,0,0,0,0,0,0,0]}"##; + let exp: BinnedResponse = serde_json::from_str(exp).unwrap(); + res.is_close(&exp)?; + Ok(()) + }; + taskrun::run(fut) +} + +#[test] +fn get_sls_archive_2() -> Result<(), Error> { + let fut = async move { + let rh = require_sls_test_host_running()?; + let cluster = &rh.cluster; + let channel = Channel { + backend: "sls-archive".into(), + name: "ARIDI-PCT:CURRENT".into(), + }; + let begstr = "2021-11-10T00:00:00Z"; + let endstr = "2021-11-10T00:10:00Z"; + let res = get_binned_json_common_res(channel, begstr, endstr, 10, AggKind::TimeWeightedScalar, cluster).await?; + let exp = r##"{"avgs":[401.1745910644531,401.5135498046875,400.8823547363281,400.66156005859375,401.8301086425781,401.19305419921875,400.5584411621094,401.4371337890625,401.4137268066406,400.77880859375],"counts":[19,6,6,19,6,6,6,19,6,6],"finalisedRange":true,"maxs":[402.04977411361034,401.8439029736943,401.22628955394583,402.1298351124666,402.1298351124666,401.5084092642013,400.8869834159359,402.05358654212733,401.74477983225313,401.1271664125047],"mins":[400.08256099885625,401.22628955394583,400.60867613419754,400.0939982844072,401.5084092642013,400.8869834159359,400.2693699961876,400.05968642775446,401.1271664125047,400.50574056423943],"tsAnchor":1636502400,"tsMs":[0,60000,120000,180000,240000,300000,360000,420000,480000,540000,600000],"tsNs":[0,0,0,0,0,0,0,0,0,0,0]}"##; + let exp: BinnedResponse = serde_json::from_str(exp).unwrap(); + res.is_close(&exp)?; + Ok(()) + }; + taskrun::run(fut) +} + +#[test] +fn get_sls_archive_3() -> Result<(), Error> { + let fut = async move { + let rh = require_sls_test_host_running()?; + let cluster = &rh.cluster; + let channel = Channel { + backend: "sls-archive".into(), + name: "ARIDI-PCT:CURRENT".into(), + }; + let begstr = "2021-11-09T00:00:00Z"; + let endstr = "2021-11-11T00:10:00Z"; + let res = get_binned_json_common_res(channel, begstr, endstr, 10, AggKind::TimeWeightedScalar, cluster).await?; + let exp = r##"{"avgs":[401.1354675292969,401.1296081542969,401.1314392089844,401.134765625,401.1371154785156,376.5816345214844,401.13775634765625,209.2684783935547,-0.06278431415557861,-0.06278431415557861,-0.06278431415557861,-0.047479934990406036,0.0],"counts":[2772,2731,2811,2689,2803,2203,2355,1232,0,0,0,2,0],"maxs":[402.1717718261533,402.18702154022117,402.1908339687381,402.198458825772,402.17939668318724,402.194646397255,402.1908339687381,402.1908339687381,-0.06278431346925281,-0.06278431346925281,-0.06278431346925281,0.0,0.0],"mins":[400.0291869996188,400.02537457110185,400.0291869996188,400.0329994281358,400.0291869996188,0.0,400.0444367136866,-0.06278431346925281,-0.06278431346925281,-0.06278431346925281,-0.06278431346925281,-0.06278431346925281,0.0],"tsAnchor":1636416000,"tsMs":[0,14400000,28800000,43200000,57600000,72000000,86400000,100800000,115200000,129600000,144000000,158400000,172800000,187200000],"tsNs":[0,0,0,0,0,0,0,0,0,0,0,0,0,0]}"##; + let exp: BinnedResponse = serde_json::from_str(exp).unwrap(); + res.is_close(&exp)?; + Ok(()) + }; + taskrun::run(fut) +} + +#[test] +fn get_sls_archive_wave_1() -> Result<(), Error> { + let fut = async move { + let rh = require_sls_test_host_running()?; + let cluster = &rh.cluster; + let channel = Channel { + backend: "sls-archive".into(), + name: "ARIDI-MBF-X:CBM-IN".into(), + }; + let begstr = "2021-11-09T00:00:00Z"; + let endstr = "2021-11-11T00:10:00Z"; + let res = get_binned_json_common_res(channel, begstr, endstr, 10, AggKind::TimeWeightedScalar, cluster).await?; + let exp = r##"{"avgs":[401.1354675292969,401.1296081542969,401.1314392089844,401.134765625,401.1371154785156,376.5816345214844,401.13775634765625,209.2684783935547,-0.06278431415557861,-0.06278431415557861,-0.06278431415557861,-0.047479934990406036,0.0],"counts":[2772,2731,2811,2689,2803,2203,2355,1232,0,0,0,2,0],"maxs":[402.1717718261533,402.18702154022117,402.1908339687381,402.198458825772,402.17939668318724,402.194646397255,402.1908339687381,402.1908339687381,-0.06278431346925281,-0.06278431346925281,-0.06278431346925281,0.0,0.0],"mins":[400.0291869996188,400.02537457110185,400.0291869996188,400.0329994281358,400.0291869996188,0.0,400.0444367136866,-0.06278431346925281,-0.06278431346925281,-0.06278431346925281,-0.06278431346925281,-0.06278431346925281,0.0],"tsAnchor":1636416000,"tsMs":[0,14400000,28800000,43200000,57600000,72000000,86400000,100800000,115200000,129600000,144000000,158400000,172800000,187200000],"tsNs":[0,0,0,0,0,0,0,0,0,0,0,0,0,0]}"##; + let exp: BinnedResponse = serde_json::from_str(exp).unwrap(); + res.is_close(&exp)?; + Ok(()) + }; + taskrun::run(fut) +} + +#[test] +fn get_sls_archive_wave_2() -> Result<(), Error> { + let fut = async move { + let rh = require_sls_test_host_running()?; + let cluster = &rh.cluster; + let channel = Channel { + backend: "sls-archive".into(), + name: "ARIDI-MBF-X:CBM-IN".into(), + }; + let begstr = "2021-11-09T10:00:00Z"; + let endstr = "2021-11-10T06:00:00Z"; + let res = get_binned_json_common_res(channel, begstr, endstr, 10, AggKind::TimeWeightedScalar, cluster).await?; + let exp = r##"{"avgs":[0.00014690558600705117,0.00014207433559931815,0.0001436264137737453,0.00014572929649148136,0.00015340493700932711,0.00014388437557499856,0.00012792187044396996,0.00014416234625969082,0.0001486341789131984,0.000145719779538922],"counts":[209,214,210,219,209,192,171,307,285,232],"maxs":[0.001784245832823217,0.0016909628175199032,0.0017036109929904342,0.0016926786629483104,0.001760474289767444,0.0018568832892924547,0.001740367733873427,0.0017931810580193996,0.0017676990246400237,0.002342566382139921],"mins":[0.000040829672798281536,0.00004028259718324989,0.000037641591916326433,0.000039788486901670694,0.00004028418697998859,0.00003767738598980941,0.0,0.00004095739495824091,0.00004668773908633739,0.00003859612115775235],"tsAnchor":1636452000,"tsMs":[0,7200000,14400000,21600000,28800000,36000000,43200000,50400000,57600000,64800000,72000000],"tsNs":[0,0,0,0,0,0,0,0,0,0,0]}"##; + let exp: BinnedResponse = serde_json::from_str(exp).unwrap(); + res.is_close(&exp)?; Ok(()) }; taskrun::run(fut) @@ -189,6 +268,44 @@ struct BinnedResponse { finalised_range: bool, } +impl BinnedResponse { + pub fn is_close(&self, other: &Self) -> Result { + let reterr = || -> Result { + Err(Error::with_msg_no_trace(format!( + "Mismatch\n{:?}\nVS\n{:?}", + self, other + ))) + }; + if self.ts_anchor != other.ts_anchor { + return reterr(); + } + if self.finalised_range != other.finalised_range { + return reterr(); + } + if self.counts != other.counts { + return reterr(); + } + let pairs = [ + (&self.mins, &other.mins), + (&self.maxs, &other.maxs), + (&self.avgs, &other.avgs), + ]; + for (t, u) in pairs { + for (&a, &b) in t.iter().zip(u) { + if let (Some(a), Some(b)) = (a, b) { + if !f64_close(a, b) { + return reterr(); + } + } else if let (None, None) = (a, b) { + } else { + return reterr(); + } + } + } + Ok(true) + } +} + fn bool_false() -> bool { false } diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index 46da970..5e8cf62 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -126,14 +126,7 @@ where let range = BinnedRange::covering_range(evq.range.clone(), count as u32)? .ok_or(Error::with_msg("covering_range returns None"))?; let perf_opts = PerfOpts { inmem_bufcap: 512 }; - let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()).map(|k| { - info!( - "setup_merged_from_remotes, MergedFromRemotes yields {:?}", - //show_event_basic_info(&k) - "TODO show_event_basic_info" - ); - k - }); + let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); let ret = TBinnerStream::<_, ::Output>::new( s, range, diff --git a/err/src/lib.rs b/err/src/lib.rs index 34c4ce1..86f0b9f 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -7,7 +7,7 @@ use http::uri::InvalidUri; use nom::error::ErrorKind; use serde::{Deserialize, Serialize}; use std::array::TryFromSliceError; -use std::fmt::Debug; +use std::fmt; use std::net::AddrParseError; use std::num::{ParseFloatError, ParseIntError}; use std::string::FromUtf8Error; @@ -15,6 +15,13 @@ use std::sync::PoisonError; use tokio::task::JoinError; use tokio::time::error::Elapsed; +#[derive(Clone, Serialize, Deserialize)] +pub enum Reason { + InternalError, + BadRequest, + IoError, +} + /** The common error type for this application. */ @@ -24,6 +31,8 @@ pub struct Error { #[serde(skip)] trace: Option, trace_str: Option, + public_msg: Option>, + reason: Option, } impl Error { @@ -32,6 +41,8 @@ impl Error { msg: s.into(), trace: None, trace_str: Some(fmt_backtrace(&backtrace::Backtrace::new())), + public_msg: None, + reason: None, } } @@ -40,12 +51,35 @@ impl Error { msg: s.into(), trace: None, trace_str: None, + public_msg: None, + reason: None, } } + pub fn mark_bad_request(mut self) -> Self { + self.reason = Some(Reason::BadRequest); + self + } + + pub fn add_public_msg(mut self, msg: impl Into) -> Self { + if self.public_msg.is_none() { + self.public_msg = Some(vec![]); + } + self.public_msg.as_mut().unwrap().push(msg.into()); + self + } + pub fn msg(&self) -> &str { &self.msg } + + pub fn public_msg(&self) -> Option<&Vec> { + self.public_msg.as_ref() + } + + pub fn reason(&self) -> Option { + self.reason.clone() + } } fn fmt_backtrace(trace: &backtrace::Backtrace) -> String { @@ -88,8 +122,8 @@ fn fmt_backtrace(trace: &backtrace::Backtrace) -> String { String::from_utf8(buf).unwrap() } -impl std::fmt::Debug for Error { - fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { +impl fmt::Debug for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { let trace_str = if let Some(trace) = &self.trace { fmt_backtrace(trace) } else if let Some(s) = &self.trace_str { @@ -101,9 +135,9 @@ impl std::fmt::Debug for Error { } } -impl std::fmt::Display for Error { - fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - std::fmt::Debug::fmt(self, fmt) +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(self, fmt) } } @@ -181,7 +215,7 @@ impl From for Error { } } -impl From> for Error { +impl From> for Error { fn from(k: nom::Err) -> Self { Self::with_msg(format!("nom::Err {:?}", k)) } diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 603701b..540da17 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -183,7 +183,10 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> if req.method() == Method::GET { let ret = serde_json::json!({ //"data_api_version": "4.0.0-beta", - "data_api_version_major": 4, + "data_api_version": { + "major": 4, + "minor": 0, + }, }); Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?) } else { @@ -429,10 +432,29 @@ where } } +trait ToPublicResponse { + fn to_public_response(&self) -> Response; +} + +impl ToPublicResponse for Error { + fn to_public_response(&self) -> Response { + let status = match self.reason() { + Some(err::Reason::BadRequest) => StatusCode::BAD_REQUEST, + Some(err::Reason::InternalError) => StatusCode::INTERNAL_SERVER_ERROR, + _ => StatusCode::INTERNAL_SERVER_ERROR, + }; + let msg = match self.public_msg() { + Some(v) => v.join("\n"), + _ => String::new(), + }; + response(status).body(Body::from(msg)).unwrap() + } +} + async fn binned(req: Request, node_config: &NodeConfigCached) -> Result, Error> { match binned_inner(req, node_config).await { Ok(ret) => Ok(ret), - Err(e) => Ok(response(StatusCode::BAD_REQUEST).body(Body::from(e.msg().to_string()))?), + Err(e) => Ok(e.to_public_response()), } } @@ -519,13 +541,13 @@ async fn prebinned_inner(req: Request, node_config: &NodeConfigCached) -> async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Result, Error> { match plain_events_inner(req, node_config).await { Ok(ret) => Ok(ret), - Err(e) => Ok(response(StatusCode::BAD_REQUEST).body(Body::from(e.msg().to_string()))?), + Err(e) => Ok(e.to_public_response()), } } async fn plain_events_inner(req: Request, node_config: &NodeConfigCached) -> Result, Error> { debug!("httpret plain_events_inner headers: {:?}", req.headers()); - let accept_def = ""; + let accept_def = APP_JSON; let accept = req .headers() .get(http::header::ACCEPT) diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index f843f34..e77ee82 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -77,6 +77,19 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) Ok(proxy_api1_map_pulse(req, proxy_config).await?) } else if path.starts_with("/api/1/gather/") { Ok(gather_json_2_v1(req, "/api/1/gather/", proxy_config).await?) + } else if path == "/api/4/version" { + if req.method() == Method::GET { + let ret = serde_json::json!({ + //"data_api_version": "4.0.0-beta", + "data_api_version": { + "major": 4, + "minor": 0, + }, + }); + Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?) + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } } else if path == "/api/4/backends" { Ok(backends(req, proxy_config).await?) } else if path == "/api/4/search/channel" { diff --git a/httpret/static/documentation/api4.html b/httpret/static/documentation/api4.html index af5dd19..03622a6 100644 --- a/httpret/static/documentation/api4.html +++ b/httpret/static/documentation/api4.html @@ -2,71 +2,75 @@ - - Databuffer API 4 Documentation - - - - + + Databuffer API 4 Documentation + + + + -

Databuffer API 4 Documentation

+

Databuffer API 4 Documentation

-

Documented here is the databuffer http api 4. The "original" unversioned api is documented at - this - location. -

-

API version 1: - https://data-api.psi.ch/api/1/documentation/ -

-

In order to keep the api surface as small as possible in comparison to api 0, we add functionality on demand, - so please feel free to create some Jira ticket!

+

Documented here is the databuffer http api 4. The "original" unversioned api is documented at + this + location. +

+

API version 1: + https://data-api.psi.ch/api/1/documentation/ +

+

In order to keep the api surface as small as possible in comparison to api 0, we add functionality on demand, + so please feel free to create some Jira ticket!

-

Timestamp format

-

The result encodes timestamps in the form:

-
{
+  

API functions

+

Currently available functionality:

+ + + + + +

Timestamp format

+

The result encodes timestamps in the form:

+
{
   "tsAnchor": 1623909860,                    // Time-anchor of this result in UNIX epoch seconds.
   "tsOffMs": [173, 472, 857, ...],       // Millisecond-offset to tsAnchor for each event/bin-edge.
   "tsOffNs": [422901, 422902, 422903, ...],  // Nanosecond-offset to tsAnchor in addition to tsOffMs for each event/bin-edge.
 }
-

which results in these nanosecond-timestamps:

-
1623909860573422901
+  

which results in these nanosecond-timestamps:

+
1623909860573422901
 1623909875671422902
 1623909897932422903
-

Formally: tsAbsolute = tsAnchor * 109 + tsOffMs * 106 + tsOffNs

-

Two reasons lead to this choice of timestamp format:

-
    -
  • Javascript can not represent the full nanosecond-resolution timestamps in a single numeric variable.
  • -
  • The lowest 6 digits of the nanosecond timestamp are anyway abused by the timing system to emit a pulse-id. -
  • -
- - -

API functions

-

Currently available functionality:

- +

Formally: tsAbsolute = tsAnchor * 109 + tsOffMs * 106 + tsOffNs

+

Two reasons lead to this choice of timestamp format:

+
    +
  • Javascript can not represent the full nanosecond-resolution timestamps in a single numeric variable.
  • +
  • The lowest 6 digits of the nanosecond timestamp are anyway abused by the timing system to emit a pulse-id. +
  • +
- -

List available backends

-

Method: GET

-

URL: https://data-api.psi.ch/api/4/backends

-

Request header: "Accept" must be "application/json"

-

CURL example:

-
+  
+  

List available backends

+

Method: GET

+

URL: https://data-api.psi.ch/api/4/backends

+

Request header: "Accept" should be "application/json" for forward compatibility.

+

CURL example:

+
 curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/backends'
 
-

Example response

-
{
+  

Example response

+
{
   "backends": [
     "sf-databuffer",
     "hipa-archive",
@@ -78,32 +82,47 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/backends'
 
 
 
-    
-    

Search channel

-

Method: GET

-

URL: https://data-api.psi.ch/api/4/search/channel

-

Query parameters: (all optional)

-
    -
  • backend (e.g. "sf-databuffer", "sls-archive", ... any from list-backends API)
  • -
  • nameRegex (e.g. "LSCP.*6")
  • -
  • sourceRegex (e.g. "178:9999")
  • -
  • descriptionRegex (e.g. "celsius")
  • -
-

Request header: "Accept" should be "application/json" for forward-compatibility but can be - omitted for e.g. a quick manual search using CURL.

-

Full channel list is long, so it's encouraged to provide a search string of some minimal length.

+ +

Get version details

+

Method: GET

+

URL: https://data-api.psi.ch/api/4/version

+

Request header: "Accept" should be "application/json" for forward compatibility.

+

CURL example:

+
+curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/version'
+
+

Example response

+
{ "data_api_version": { "major": 4, "minor": 0} }
-

CURL example:

-
+
+
+  
+  

Search channel

+

Method: GET

+

URL: https://data-api.psi.ch/api/4/search/channel

+

Request header: "Accept" should be "application/json" for forward compatibility.

+

Query parameters: (all optional)

+
    +
  • backend (e.g. "sf-databuffer", "sls-archive", ... any from list-backends API)
  • +
  • nameRegex (e.g. "LSCP.*6")
  • +
  • sourceRegex (e.g. "178:9999")
  • +
  • descriptionRegex (e.g. "celsius")
  • +
+

Request header: "Accept" should be "application/json" for forward-compatibility but can be + omitted for e.g. a quick manual search using CURL.

+

Full channel list is long, so it's encouraged to provide a search string of some minimal length.

+ +

CURL example:

+
 curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/search/channel?sourceRegex=CV.E.+37&nameRegex=120.+y2$'
 
-

Example response:

-

Keys always present: name, backend.

-

Other keys are optional, it depends on the data found on disk: sometimes there is an empty string on disk, - sometimes - that key is missing.

-
{
+  

Example response:

+

Keys always present: name, backend.

+

Other keys are optional, it depends on the data found on disk: sometimes there is an empty string on disk, + sometimes + that key is missing.

+
{
   "channels": [
     {
       "name": "S10MA01-DBPM120:Y2",
@@ -135,37 +154,37 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/search/channel
     }
   ]
 }
-

The search constraints are AND'd.

+

The search constraints are AND'd.

- -

Query event data

-

Returns the full event values in a given time range.

-

Method: GET

-

URL: https://data-api.psi.ch/api/4/events

-

Query parameters:

-
    -
  • channelBackend (e.g. "sf-databuffer")
  • -
  • channelName (e.g. "S10CB02-RBOC-DCP10:FOR-AMPLT-AVG")
  • -
  • begDate (e.g. "2021-05-26T07:10:00.000Z")
  • -
  • endDate (e.g. "2021-05-26T07:16:00.000Z")
  • -
-

Request header: "Accept" must be "application/json"

+ +

Query event data

+

Returns the full event values in a given time range.

+

Method: GET

+

URL: https://data-api.psi.ch/api/4/events

+

Query parameters:

+
    +
  • channelBackend (e.g. "sf-databuffer")
  • +
  • channelName (e.g. "S10CB02-RBOC-DCP10:FOR-AMPLT-AVG")
  • +
  • begDate (e.g. "2021-05-26T07:10:00.000Z")
  • +
  • endDate (e.g. "2021-05-26T07:16:00.000Z")
  • +
+

Request header: "Accept" should be "application/json" for forward compatibility.

-

Timeout

-

If the requested range takes too long to retrieve, then the flags timedOut: true will be set. -

+

Timeout

+

If the requested range takes too long to retrieve, then the flags timedOut: true will be set. +

-

CURL example:

-
+  

CURL example:

+
 curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/events?channelBackend=sf-databuffer
   &channelName=S10CB02-RBOC-DCP10:FOR-AMPLT-AVG&begDate=2021-05-26T07:10:00.000Z&endDate=2021-05-26T07:16:00.000Z'
 
-

Example response:

-
+  

Example response:

+
 {
   "finalisedRange": true,
   "tsAnchor": 1623763172,
@@ -190,56 +209,56 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/events?channel
 }
 
-

Finalised range

-

If the server can determine that no more data will be added to the requested time range - then it will add the flag finalisedRange: true to the response.

+

Finalised range

+

If the server can determine that no more data will be added to the requested time range + then it will add the flag finalisedRange: true to the response.

- -

Query binned data

-

Method: GET

-

URL: https://data-api.psi.ch/api/4/binned

-

Query parameters:

+ +

Query binned data

+

Method: GET

+

URL: https://data-api.psi.ch/api/4/binned

+

Query parameters:

+
    +
  • channelBackend (e.g. "sf-databuffer")
  • +
  • channelName (e.g. "SLAAR-LSCP4-LAS6891:CH7:1")
  • +
  • begDate (e.g. "2021-05-26T07:10:00.000Z")
  • +
  • endDate (e.g. "2021-05-26T07:16:00.000Z")
  • +
  • binCount (number of requested bins in time-dimension, e.g. "6". The actual number of returned bins depends + on + bin-cache-grid-resolution. The server tries to find the best match.)
  • +
  • binningScheme (optional)
    • -
    • channelBackend (e.g. "sf-databuffer")
    • -
    • channelName (e.g. "SLAAR-LSCP4-LAS6891:CH7:1")
    • -
    • begDate (e.g. "2021-05-26T07:10:00.000Z")
    • -
    • endDate (e.g. "2021-05-26T07:16:00.000Z")
    • -
    • binCount (number of requested bins in time-dimension, e.g. "6". The actual number of returned bins depends - on - bin-cache-grid-resolution. The server tries to find the best match.)
    • -
    • binningScheme (optional)
    • -
        -
      • if not specified: default is "binningScheme=unweightedScalar".
      • -
      • "binningScheme=unweightedScalar": non-weighted binning, waveform gets first averaged to a scalar.
      • -
      • "binningScheme=timeWeightedScalar": time-weighted binning, waveform gets first averaged to a scalar. -
      • -
      • "binningScheme=binnedX&binnedXcount=13": waveform gets first binned to 13 bins in X-dimension - (waveform-dimension).
      • -
      • "binningScheme=binnedX&binnedXcount=0": waveform is not binned in X-dimension but kept at full length. -
      • -
      +
    • if not specified: default is "binningScheme=unweightedScalar".
    • +
    • "binningScheme=unweightedScalar": non-weighted binning, waveform gets first averaged to a scalar.
    • +
    • "binningScheme=timeWeightedScalar": time-weighted binning, waveform gets first averaged to a scalar. +
    • +
    • "binningScheme=binnedX&binnedXcount=13": waveform gets first binned to 13 bins in X-dimension + (waveform-dimension).
    • +
    • "binningScheme=binnedX&binnedXcount=0": waveform is not binned in X-dimension but kept at full length. +
    -

    Request header: "Accept" must be "application/json"

    +
+

Request header: "Accept" should be "application/json" for forward compatibility.

-

CURL example:

-
+  

CURL example:

+
 curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/binned?channelBackend=sf-databuffer
   &channelName=SLAAR-LSCP4-LAS6891:CH7:1&begDate=2021-05-25T00:00:00.000Z&endDate=2021-05-26T00:00:00.000Z&binCount=3'
 
-

Partial result

-

If the requested range takes longer time to retrieve, then a partial result with at least one bin is returned. - The partial result will contain the necessary information to send another request with a range that - starts with the first not-yet-retrieved bin. - This information is provided by the continueAt and missingBins fields. - This enables the user agent to start the presentation to the user while updating the user interface - as new bins are received.

+

Partial result

+

If the requested range takes longer time to retrieve, then a partial result with at least one bin is returned. + The partial result will contain the necessary information to send another request with a range that + starts with the first not-yet-retrieved bin. + This information is provided by the continueAt and missingBins fields. + This enables the user agent to start the presentation to the user while updating the user interface + as new bins are received.

-

Example response (without usage of binningScheme):

-
{
+  

Example response (without usage of binningScheme):

+
{
   "avgs": [
     16204.087890625,
     16204.3798828125,
@@ -301,8 +320,8 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/binned?channel
 }
 
-

Example response (waveform channel and usage of binningScheme):

-
{
+  

Example response (waveform channel and usage of binningScheme):

+
{
   "tsAnchor": 1623769950,
   "tsMs": [
     0,
@@ -396,20 +415,20 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/binned?channel
 }
 
-

Complete result

-

If the result does not contain a continueAt key then the result is complete.

+

Complete result

+

If the result does not contain a continueAt key then the result is complete.

-

Finalised range

-

If the server can determine that no more data will be added to the requested time range - then it will add the flag finalisedRange: true to the response.

+

Finalised range

+

If the server can determine that no more data will be added to the requested time range + then it will add the flag finalisedRange: true to the response.

-

Feedback and comments very much appreciated!

-

dominik.werder@psi.ch

-

or please assign me a JIRA ticket.

+

Feedback and comments very much appreciated!

+

dominik.werder@psi.ch

+

or please assign me a JIRA ticket.

- + diff --git a/httpret/static/documentation/style.css b/httpret/static/documentation/style.css index 8129850..d9c455a 100644 --- a/httpret/static/documentation/style.css +++ b/httpret/static/documentation/style.css @@ -21,9 +21,17 @@ p { margin-bottom: 0.4em; } +ul { + margin-bottom: 2.0em; +} + +li { + margin-top: 0.3em; +} + body { font-family: monospace; - font-size: 100%; + font-size: 140%; line-height: 1.4; color: #000; } diff --git a/items/src/eventvalues.rs b/items/src/eventvalues.rs index b18befa..e2cbcd2 100644 --- a/items/src/eventvalues.rs +++ b/items/src/eventvalues.rs @@ -248,8 +248,6 @@ where type Output = EventValuesCollectorOutput; fn ingest(&mut self, src: &Self::Input) { - // TODO should be able to remove this - err::todo(); self.vals.append(src); } @@ -262,8 +260,6 @@ where } fn result(self) -> Result { - // TODO should be able to remove this - err::todo(); let tst = ts_offs_from_abs(&self.vals.tss); let ret = Self::Output { ts_anchor_sec: tst.0, diff --git a/items/src/xbinnedscalarevents.rs b/items/src/xbinnedscalarevents.rs index 6fcb0df..85daad3 100644 --- a/items/src/xbinnedscalarevents.rs +++ b/items/src/xbinnedscalarevents.rs @@ -249,7 +249,7 @@ where } fn apply_event_unweight(&mut self, avg: f32, min: NTY, max: NTY) { - debug!("apply_event_unweight"); + //debug!("apply_event_unweight"); self.apply_min_max(min, max); let vf = avg; if vf.is_nan() { @@ -260,7 +260,7 @@ where } fn apply_event_time_weight(&mut self, ts: u64) { - debug!("apply_event_time_weight"); + //debug!("apply_event_time_weight"); if let (Some(v), Some(min), Some(max)) = (self.last_avg, self.last_min, self.last_max) { self.apply_min_max(min, max); let w = if self.do_time_weight { diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 0452733..04f0e55 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -511,6 +511,15 @@ impl Shape { ))) } } + JsVal::Object(j) => match j.get("Wave") { + Some(JsVal::Number(j)) => Ok(Shape::Wave(j.as_u64().ok_or_else(|| { + Error::with_msg_no_trace(format!("Shape from_db_jsval can not understand {:?}", v)) + })? as u32)), + _ => Err(Error::with_msg_no_trace(format!( + "Shape from_db_jsval can not understand {:?}", + v + ))), + }, _ => Err(Error::with_msg_no_trace(format!( "Shape from_db_jsval can not understand {:?}", v @@ -1417,3 +1426,23 @@ pub struct ChannelInfo { pub shape: Shape, pub msg: serde_json::Value, } + +pub fn f32_close(a: f32, b: f32) -> bool { + if (a - b).abs() < 1e-5 { + true + } else if a / b > 0.9999 && a / b < 1.0001 { + true + } else { + false + } +} + +pub fn f64_close(a: f64, b: f64) -> bool { + if (a - b).abs() < 1e-5 { + true + } else if a / b > 0.9999 && a / b < 1.0001 { + true + } else { + false + } +} diff --git a/taskrun/src/taskrun.rs b/taskrun/src/taskrun.rs index 86ab0f8..b34b454 100644 --- a/taskrun/src/taskrun.rs +++ b/taskrun/src/taskrun.rs @@ -96,18 +96,18 @@ pub fn tracing_init() { "info", "archapp::archeng=info", "archapp::archeng::datablockstream=info", - "archapp::archeng::indextree=debug", - "archapp::archeng::blockrefstream=debug", + "archapp::archeng::indextree=info", + "archapp::archeng::blockrefstream=info", "archapp::archeng::blockstream=info", "archapp::archeng::ringbuf=info", "archapp::archeng::backreadbuf=info", - "archapp::archeng::pipe=info", + "archapp::archeng::pipe=debug", "archapp::storagemerge=info", "streams::rangefilter=info", - "items::eventvalues=debug", - "items::xbinnedscalarevents=debug", + "items::eventvalues=info", + "items::xbinnedscalarevents=info", "disk::binned=info", - "nodenet::conn=debug", + "nodenet::conn=info", "daqbuffer::test=info", ] .join(","), @@ -132,8 +132,8 @@ pub fn test_cluster() -> netpod::Cluster { .map(|id| netpod::Node { host: "localhost".into(), listen: "0.0.0.0".into(), - port: 8360 + id as u16, - port_raw: 8360 + id as u16 + 100, + port: 6170 + id as u16, + port_raw: 6170 + id as u16 + 100, data_base_path: format!("../tmpdata/node{:02}", id).into(), cache_base_path: format!("../tmpdata/node{:02}", id).into(), ksprefix: "ks".into(), @@ -163,8 +163,8 @@ pub fn sls_test_cluster() -> netpod::Cluster { .map(|id| netpod::Node { host: "localhost".into(), listen: "0.0.0.0".into(), - port: 8362 + id as u16, - port_raw: 8362 + id as u16 + 100, + port: 6190 + id as u16, + port_raw: 6190 + id as u16 + 100, data_base_path: format!("NOdatapath{}", id).into(), cache_base_path: format!("../tmpdata/node{:02}", id).into(), ksprefix: "NOKS".into(),