From 19ff08a4bd55812569d07b2c12518b446724ac3c Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 18 May 2021 16:35:51 +0200 Subject: [PATCH] More generic BinnedBytesForHttpStream, changes for status page --- disk/src/agg/scalarbinbatch.rs | 9 ++ disk/src/binned.rs | 14 ++- disk/src/cache.rs | 60 ++++++---- httpret/src/lib.rs | 6 +- httpret/static/documentation/index.html | 4 +- httpret/static/documentation/page.css | 5 - httpret/static/documentation/script.js | 86 ++++++++++++++ httpret/static/documentation/status-main.html | 21 ++++ httpret/static/documentation/style.css | 109 ++++++++++++++++++ netpod/src/lib.rs | 13 +++ 10 files changed, 289 insertions(+), 38 deletions(-) delete mode 100644 httpret/static/documentation/page.css create mode 100644 httpret/static/documentation/script.js create mode 100644 httpret/static/documentation/status-main.html create mode 100644 httpret/static/documentation/style.css diff --git a/disk/src/agg/scalarbinbatch.rs b/disk/src/agg/scalarbinbatch.rs index ef3ac42..3a15e89 100644 --- a/disk/src/agg/scalarbinbatch.rs +++ b/disk/src/agg/scalarbinbatch.rs @@ -1,7 +1,10 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside}; +use crate::binned::MakeBytesFrame; +use crate::frame::makeframe::make_frame; use crate::streamlog::LogItem; use bytes::{BufMut, Bytes, BytesMut}; +use err::Error; use netpod::log::*; use netpod::timeunits::SEC; use netpod::{EventDataReadStats, NanoRange}; @@ -402,6 +405,12 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatchStreamItem { } } +impl MakeBytesFrame for Result { + fn make_bytes_frame(&self) -> Result { + Ok(make_frame::(self)?.freeze()) + } +} + pub struct MinMaxAvgScalarBinBatchStreamItemAggregator { agg: MinMaxAvgScalarBinBatchAggregator, } diff --git a/disk/src/binned.rs b/disk/src/binned.rs index b830784..3bc8080 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -3,7 +3,6 @@ use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatc use crate::binnedstream::{BinnedStream, BinnedStreamFromPreBinnedPatches}; use crate::cache::{BinnedQuery, MergedFromRemotes}; use crate::channelconfig::{extract_matching_config_entry, read_local_config}; -use crate::frame::makeframe::make_frame; use crate::raw::EventsQuery; use bytes::Bytes; use chrono::{TimeZone, Utc}; @@ -120,9 +119,14 @@ impl BinnedBytesForHttpStream { } } -impl Stream for BinnedBytesForHttpStream +pub trait MakeBytesFrame { + fn make_bytes_frame(&self) -> Result; +} + +impl Stream for BinnedBytesForHttpStream where - S: Stream> + Unpin, + S: Stream + Unpin, + I: MakeBytesFrame, { type Item = Result; @@ -136,8 +140,8 @@ where return Ready(None); } match self.inp.poll_next_unpin(cx) { - Ready(Some(item)) => match make_frame::(&item) { - Ok(buf) => Ready(Some(Ok(buf.freeze()))), + Ready(Some(item)) => match item.make_bytes_frame() { + Ok(buf) => Ready(Some(Ok(buf))), Err(e) => { self.errored = true; Ready(Some(Err(e.into()))) diff --git a/disk/src/cache.rs b/disk/src/cache.rs index e0a8202..5c11a6b 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -16,6 +16,7 @@ use netpod::{ }; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; +use std::fmt::{Display, Formatter}; use std::future::Future; use std::path::PathBuf; use std::pin::Pin; @@ -45,6 +46,27 @@ impl CacheUsage { } .into() } + + pub fn from_params(params: &BTreeMap) -> Result { + let ret = params.get("cache_usage").map_or(Ok::<_, Error>(CacheUsage::Use), |k| { + if k == "use" { + Ok(CacheUsage::Use) + } else if k == "ignore" { + Ok(CacheUsage::Ignore) + } else if k == "recreate" { + Ok(CacheUsage::Recreate) + } else { + Err(Error::with_msg(format!("unexpected cache_usage {:?}", k)))? + } + })?; + Ok(ret) + } +} + +impl Display for CacheUsage { + fn fmt(&self, fmt: &mut Formatter) -> std::fmt::Result { + write!(fmt, "{}", self.query_param_value()) + } } #[derive(Clone, Debug)] @@ -76,9 +98,13 @@ impl BinnedQuery { .ok_or(Error::with_msg("missing bin_count"))? .parse() .map_err(|e| Error::with_msg(format!("can not parse bin_count {:?}", e)))?, - agg_kind: AggKind::DimXBins1, + agg_kind: params + .get("agg_kind") + .map_or("DimXBins1", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse agg_kind {:?}", e)))?, channel: channel_from_params(¶ms)?, - cache_usage: cache_usage_from_params(¶ms)?, + cache_usage: CacheUsage::from_params(¶ms)?, disk_stats_every: ByteSize::kb(disk_stats_every), }; info!("BinnedQuery::from_request {:?}", ret); @@ -158,27 +184,26 @@ impl PreBinnedQuery { .map_err(|e| Error::with_msg(format!("can not parse disk_stats_every_kb {:?}", e)))?; let ret = PreBinnedQuery { patch: PreBinnedPatchCoord::new(bin_t_len, patch_t_len, patch_ix), - agg_kind: AggKind::DimXBins1, + agg_kind: params + .get("agg_kind") + .map_or(&format!("{:?}", AggKind::DimXBins1), |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse agg_kind {:?}", e)))?, channel: channel_from_params(¶ms)?, - cache_usage: cache_usage_from_params(¶ms)?, + cache_usage: CacheUsage::from_params(¶ms)?, disk_stats_every: ByteSize::kb(disk_stats_every), }; Ok(ret) } pub fn make_query_string(&self) -> String { - let cache_usage = match self.cache_usage { - CacheUsage::Use => "use", - CacheUsage::Ignore => "ignore", - CacheUsage::Recreate => "recreate", - }; format!( "{}&channel_backend={}&channel_name={}&agg_kind={:?}&cache_usage={}&disk_stats_every_kb={}", self.patch.to_url_params_strings(), self.channel.backend, self.channel.name, self.agg_kind, - cache_usage, + self.cache_usage, self.disk_stats_every.bytes() / 1024, ) } @@ -202,21 +227,6 @@ fn channel_from_params(params: &BTreeMap) -> Result) -> Result { - let ret = params.get("cache_usage").map_or(Ok::<_, Error>(CacheUsage::Use), |k| { - if k == "use" { - Ok(CacheUsage::Use) - } else if k == "ignore" { - Ok(CacheUsage::Ignore) - } else if k == "recreate" { - Ok(CacheUsage::Recreate) - } else { - Err(Error::with_msg(format!("unexpected cache_usage {:?}", k)))? - } - })?; - Ok(ret) -} - // NOTE This answers a request for a single valid pre-binned patch. // A user must first make sure that the grid spec is valid, and that this node is responsible for it. // Otherwise it is an error. diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 4ac3eb9..e8caf10 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -87,7 +87,7 @@ impl UnwindSafe for Cont {} macro_rules! static_http { ($path:expr, $tgt:expr, $tgtex:expr) => { if $path == concat!("/api/4/documentation/", $tgt) { - let c = include_bytes!(concat!("../static/documentation/", $tgt, $tgtex)); + let c = include_bytes!(concat!("../static/documentation/", $tgtex)); return Ok(response(StatusCode::OK).body(Body::from(&c[..]))?); } }; @@ -141,7 +141,9 @@ async fn data_api_proxy_try(req: Request, node_config: &NodeConfigCached) } else if path.starts_with("/api/4/documentation/") { if req.method() == Method::GET { static_http!(path, "", "index.html"); - static_http!(path, "page.css"); + static_http!(path, "style.css"); + static_http!(path, "script.js"); + static_http!(path, "status-main.html"); Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) diff --git a/httpret/static/documentation/index.html b/httpret/static/documentation/index.html index 4dd5d9c..2cd35d3 100644 --- a/httpret/static/documentation/index.html +++ b/httpret/static/documentation/index.html @@ -1,8 +1,10 @@ + Retrieval Documentation - + +

Retrieval 4.0 Documentation

diff --git a/httpret/static/documentation/page.css b/httpret/static/documentation/page.css deleted file mode 100644 index 8885d68..0000000 --- a/httpret/static/documentation/page.css +++ /dev/null @@ -1,5 +0,0 @@ -body {} - -h1 { - color: cadetblue; -} diff --git a/httpret/static/documentation/script.js b/httpret/static/documentation/script.js new file mode 100644 index 0000000..55a73f0 --- /dev/null +++ b/httpret/static/documentation/script.js @@ -0,0 +1,86 @@ +"use strict"; + +function load_status_main(ev) { + const ts1 = Date.now(); + const dom_ev = ev; + const b = ev.target; + b.classList.remove("loaded"); + b.classList.add("loading"); + b.value = b.dataset.btnLabel + ""; + const query = { + hosts: "", + }; + const fetch_init = { + method: "get", + /*headers: { + retrieval_instance: document.getElementById("retrieval_instance").value, + }, + body: JSON.stringify(query),*/ + }; + fetch(g_config.api_base + "node_status", fetch_init) + .then(x => Promise.all([x.json(), Date.now()])) + .then(g_config.ui_delay_test) + .then(g_config.ui_delay_blink) + .then(kk => { + const js = kk[0]; + const ts2 = kk[1]; + if (false) { + const response = document.getElementById("response"); + // Different ways to do the same thing: + //response.querySelectorAll("*").forEach(n => n.remove()); + //response.innerHTML = ""; + response.textContent = ""; + while (response.firstChild) { + response.removeChild(response.lastChild); + response.lastChild.remove(); + } + response.replaceChildren(); + //response.replaceChild(); + //JSON.stringify(js, null, 2); + //for (let machine of js) { + // console.log(typeof(machine)); + //} + const dat2 = js.hosts; + sort_default(dat2); + response.appendChild(render_retrieval_metrics_as_table(dat2)); + response.appendChild(render_host_memory_as_table(dat2)); + //response.appendChild(render_host_memStd_as_table(dat2)); + response.appendChild(render_host_bufferPools_as_table(dat2)); + } + { + let b = document.getElementById("load_status"); + b.innerHTML = "Loaded (" + (ts2 - ts1) + " ms)"; + } + { + let b = dom_ev.target; + b.classList.remove("loading"); + b.classList.add("loaded"); + b.setAttribute("value", b.dataset.btnLabel); + } + }); +} + +var g_config = { + api_base: "http://localhost:8059/api/4/", + ui_delay_test: x => x, + ui_delay_blink: x => new Promise(resolve => setTimeout(() => resolve(x), 50)), + //ui_delay_blink: x => x, +}; + +function config_for_test() { + g_config.api_base = "http://localhost:8059/api/4/"; +} + +function init() { +} + +window.addEventListener("load", ev => { + if (document.location.href.includes("8060")) { + config_for_test(); + } + init(); + const init_load_ele = document.getElementById("btn_load"); + if (init_load_ele != null) { + init_load_ele.click(); + } +}); diff --git a/httpret/static/documentation/status-main.html b/httpret/static/documentation/status-main.html new file mode 100644 index 0000000..46c046f --- /dev/null +++ b/httpret/static/documentation/status-main.html @@ -0,0 +1,21 @@ + + + + + Main Status + + + + + +

daqbuffer - Main Status

+ +

+ + +

+ +

+
+ + diff --git a/httpret/static/documentation/style.css b/httpret/static/documentation/style.css new file mode 100644 index 0000000..26d54cb --- /dev/null +++ b/httpret/static/documentation/style.css @@ -0,0 +1,109 @@ +div, h1, h2, h3, h4, h5, pre, code, p { + margin: 0; + padding: 0; +} + +h1, h2, h3, h4, h5 { + margin-top: 1.2em; + margin-bottom: 0.6em; +} + +input { + outline: none; +} + +h1 { + margin-top: 0.7em; +} + +p { + margin-top: 0.4em; + margin-bottom: 0.4em; +} + +body { + font-family: monospace; + font-size: 80%; + line-height: 1.4; + color: #000; +} + +td { + padding: 0 1em 0 1em; + margin: 0 0em 0 0em; + text-align: right; +} + +input[type=button] { + background-color: #e7e7e7; + border-style: solid; + border-radius: 6px; + border-color: #e0e0e0; + border-width: 2px; + text-decoration: none; + color: #000; + padding: 5px 18px; + cursor: pointer; +} + +input[type=button] { + margin-top: 1.4em; +} + +input[type=button].loading { + background-color: #e7d0d0; +} + +input[type=button].loaded { + border-color: #c0e7c0; +} + +input[type=button].run_example_button { + border-color: #b0e7b0; +} + +input[type=button].running { + border-color: #e7b0b0; +} + +code#output { + display: block; + min-height: 20em; +} + +p#load_status { + margin-top: 10em; +} + +p.buttonrow { + position: fixed; + margin-top: 20px; +} + +div#footer { + width: 2em; + height: 6em; +} + +.prejson { + font-family: monospace; + white-space: pre; +} + +.bytesTB { + color: #8b1f8e; +} + +.bytesGB { + color: #952300; +} + +.bytesMB { + --ccolor: #255111; + color: #12697d; +} + +.highlight_error { + color: red; + font-weight: bold; +} diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index ad8b125..b7ca410 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -5,6 +5,7 @@ use futures_util::StreamExt; use serde::{Deserialize, Serialize}; use std::path::PathBuf; use std::pin::Pin; +use std::str::FromStr; use std::task::{Context, Poll}; use timeunits::*; #[allow(unused_imports)] @@ -595,6 +596,18 @@ pub enum AggKind { DimXBins1, } +impl FromStr for AggKind { + type Err = Error; + + fn from_str(s: &str) -> Result { + if s == "DimXBins1" { + Ok(AggKind::DimXBins1) + } else { + Err(Error::with_msg(format!("can not parse {} as AggKind", s))) + } + } +} + pub fn query_params(q: Option<&str>) -> std::collections::BTreeMap { let mut map = std::collections::BTreeMap::new(); match q {