From ef803a45a23936a78e1445575c9da02216d36423 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 24 Jun 2021 08:46:56 +0200 Subject: [PATCH] Add BoolNum type and test on slow GLS --- daqbuffer/src/nodes.rs | 1 + disk/src/aggtest.rs | 1 + disk/src/binned.rs | 86 ++++++++++++----------- disk/src/binned/pbv.rs | 18 +++-- disk/src/binned/prebinned.rs | 3 +- disk/src/cache.rs | 9 ++- disk/src/channelexec.rs | 3 +- disk/src/dataopen.rs | 8 +++ disk/src/decode.rs | 14 +++- disk/src/eventchunker.rs | 4 ++ disk/src/frame/makeframe.rs | 5 ++ disk/src/gen.rs | 1 + disk/src/index.rs | 27 +------- disk/src/lib.rs | 6 +- disk/src/raw/conn.rs | 7 +- httpret/src/lib.rs | 26 ++++--- httpret/src/proxy.rs | 16 ++++- netpod/Cargo.toml | 1 + netpod/src/lib.rs | 128 ++++++++++++++++++++++++++++------- 19 files changed, 249 insertions(+), 115 deletions(-) diff --git a/daqbuffer/src/nodes.rs b/daqbuffer/src/nodes.rs index 2535a90..7afb5bd 100644 --- a/daqbuffer/src/nodes.rs +++ b/daqbuffer/src/nodes.rs @@ -43,6 +43,7 @@ fn test_cluster() -> Cluster { ksprefix: "ks".into(), split: id, backend: "testbackend".into(), + bin_grain_kind: 0, }) .collect(); Cluster { diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 10a1ac0..88dd360 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -15,6 +15,7 @@ pub fn make_test_node(id: u32) -> Node { split: id, ksprefix: "ks".into(), backend: "testbackend".into(), + bin_grain_kind: 0, } } diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 6686711..56420e2 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -9,8 +9,8 @@ use crate::binned::query::BinnedQuery; use crate::binnedstream::BoxedStream; use crate::channelexec::{channel_exec, collect_plain_events_json, ChannelExecFunction}; use crate::decode::{ - BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValues, EventValuesDim0Case, EventValuesDim1Case, - LittleEndian, NumFromBytes, + Endianness, EventValueFromBytes, EventValueShape, EventValues, EventValuesDim0Case, EventValuesDim1Case, + NumFromBytes, }; use crate::frame::makeframe::{Framable, FrameType, SubFrId}; use crate::merge::mergedfromremotes::MergedFromRemotes; @@ -24,8 +24,8 @@ use futures_util::{FutureExt, StreamExt}; use netpod::log::*; use netpod::timeunits::SEC; use netpod::{ - x_bin_count, AggKind, BinnedRange, ByteOrder, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, - PreBinnedPatchRange, ScalarType, Shape, + x_bin_count, AggKind, BinnedRange, BoolNum, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, + PreBinnedPatchRange, Shape, }; use num_traits::{AsPrimitive, Bounded, Float, Zero}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; @@ -84,11 +84,20 @@ where <::Output as TimeBinnableType>::Output: Sized, { let _ = event_value_shape; - let range = BinnedRange::covering_range(query.range().clone(), query.bin_count())?.ok_or(Error::with_msg( - format!("binned_bytes_for_http BinnedRange::covering_range returned None"), - ))?; + let range = BinnedRange::covering_range( + query.range().clone(), + query.bin_count(), + node_config.node.bin_grain_kind, + )? + .ok_or(Error::with_msg(format!( + "binned_bytes_for_http BinnedRange::covering_range returned None" + )))?; let perf_opts = PerfOpts { inmem_bufcap: 512 }; - match PreBinnedPatchRange::covering_range(query.range().clone(), query.bin_count()) { + match PreBinnedPatchRange::covering_range( + query.range().clone(), + query.bin_count(), + node_config.node.bin_grain_kind, + ) { Ok(Some(pre_range)) => { info!("binned_bytes_for_http found pre_range: {:?}", pre_range); if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { @@ -151,6 +160,7 @@ pub struct BinnedResponseDyn { stream: Pin> + Send>>, } +// TODO remove after refactor of PPP: fn make_num_pipeline_nty_end_evs_enp( shape: Shape, _agg_kind: AggKind, @@ -187,6 +197,8 @@ where Ok(ret) } +// TODO remove after refactor of PPP: +#[allow(dead_code)] fn make_num_pipeline_nty_end( shape: Shape, query: BinnedQuery, @@ -249,8 +261,8 @@ where ) } AggKind::DimXBinsN(_) => { - /*let events_node_proc = < as EventValueShape>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); - let yo = make_num_pipeline_nty_end_evs_enp::( + let _events_node_proc = < as EventValueShape>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); + /*let yo = make_num_pipeline_nty_end_evs_enp::( shape, agg_kind, evs, @@ -269,6 +281,7 @@ where } } +// TODO remove after refactor of PPP: #[allow(dead_code)] fn make_num_pipeline_nty_end_old( shape: Shape, @@ -306,6 +319,8 @@ where } } +// TODO remove after refactor of PPP: +#[allow(unused_macros)] macro_rules! match_end { ($nty:ident, $end:expr, $shape:expr, $query:expr, $ppp:expr, $node_config:expr) => { match $end { @@ -315,7 +330,8 @@ macro_rules! match_end { }; } -fn make_num_pipeline_entry( +// TODO remove after refactor of PPP +/*fn make_num_pipeline_entry( scalar_type: ScalarType, byte_order: ByteOrder, shape: Shape, @@ -347,27 +363,15 @@ where ScalarType::I64 => match_end!(i64, byte_order, shape, query, ppp, node_config), ScalarType::F32 => match_end!(f32, byte_order, shape, query, ppp, node_config), ScalarType::F64 => match_end!(f64, byte_order, shape, query, ppp, node_config), + ScalarType::BOOL => match_end!(BoolNum, byte_order, shape, query, ppp, node_config), } -} +}*/ async fn make_num_pipeline( query: &BinnedQuery, - ppp: PPP, + _ppp: PPP, node_config: &NodeConfigCached, -) -> Result -where - PPP: PipelinePostProcessA, - PPP: PipelinePostProcessB>, - PPP: PipelinePostProcessB>, - PPP: PipelinePostProcessB>, - PPP: PipelinePostProcessB>, - PPP: PipelinePostProcessB>, - PPP: PipelinePostProcessB>, - PPP: PipelinePostProcessB>, - PPP: PipelinePostProcessB>, - PPP: PipelinePostProcessB>, - PPP: PipelinePostProcessB>, -{ +) -> Result { if query.channel().backend != node_config.node.backend { let err = Error::with_msg(format!( "backend mismatch node: {} requested: {}", @@ -400,7 +404,7 @@ where MatchingConfigEntry::Entry(entry) => { // TODO make this a stream log: info!("binned_bytes_for_http found config entry {:?}", entry); - let ret = make_num_pipeline_entry( + /*let ret = make_num_pipeline_entry( entry.scalar_type.clone(), entry.byte_order.clone(), entry.to_shape()?, @@ -408,7 +412,8 @@ where ppp, node_config, )?; - Ok(ret) + Ok(ret)*/ + err::todoval() } } } @@ -747,13 +752,21 @@ impl ChannelExecFunction for BinnedJsonChannelExec { FrameType + Framable + DeserializeOwned, { let _ = event_value_shape; - let range = - BinnedRange::covering_range(self.query.range().clone(), self.query.bin_count())?.ok_or(Error::with_msg( - format!("binned_bytes_for_http BinnedRange::covering_range returned None"), - ))?; + let range = BinnedRange::covering_range( + self.query.range().clone(), + self.query.bin_count(), + self.node_config.node.bin_grain_kind, + )? + .ok_or(Error::with_msg(format!( + "binned_bytes_for_http BinnedRange::covering_range returned None" + )))?; let t_bin_count = range.count as u32; let perf_opts = PerfOpts { inmem_bufcap: 512 }; - let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) { + let souter = match PreBinnedPatchRange::covering_range( + self.query.range().clone(), + self.query.bin_count(), + self.node_config.node.bin_grain_kind, + ) { Ok(Some(pre_range)) => { info!("binned_bytes_for_http found pre_range: {:?}", pre_range); if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { @@ -982,7 +995,6 @@ pub trait NumOps: fn min_or_nan() -> Self; fn max_or_nan() -> Self; fn is_nan(&self) -> bool; - fn fourty_two() -> Self; } macro_rules! impl_num_ops { @@ -997,9 +1009,6 @@ macro_rules! impl_num_ops { fn is_nan(&self) -> bool { $is_nan(self) } - fn fourty_two() -> Self { - 42 as Self - } } }; } @@ -1022,6 +1031,7 @@ impl_num_ops!(i32, MIN, MAX, is_nan_int); impl_num_ops!(i64, MIN, MAX, is_nan_int); impl_num_ops!(f32, NAN, NAN, is_nan_float); impl_num_ops!(f64, NAN, NAN, is_nan_float); +impl_num_ops!(BoolNum, MIN, MAX, is_nan_int); pub trait EventsDecoder { type Output; diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index 5b981c5..b592cb2 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -126,7 +126,7 @@ where } // TODO do I need to set up more transformations or binning to deliver the requested data? let count = self.query.patch().patch_t_len() / self.query.patch().bin_t_len(); - let range = BinnedRange::covering_range(evq.range.clone(), count as u32)? + let range = BinnedRange::covering_range(evq.range.clone(), count as u32, self.node_config.node.bin_grain_kind)? .ok_or(Error::with_msg("covering_range returns None"))?; let perf_opts = PerfOpts { inmem_bufcap: 512 }; let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); @@ -204,7 +204,11 @@ where fn try_setup_fetch_prebinned_higher_res(&mut self) -> Result<(), Error> { let range = self.query.patch().patch_range(); - match PreBinnedPatchRange::covering_range(range, self.query.patch().bin_count() + 1) { + match PreBinnedPatchRange::covering_range( + range, + self.query.patch().bin_count() + 1, + self.node_config.node.bin_grain_kind, + ) { Ok(Some(range)) => { self.fut2 = Some(self.setup_from_higher_res_prebinned(range)?); } @@ -247,8 +251,14 @@ where self.write_fut = None; match item { Ok(res) => { - self.streamlog - .append(Level::INFO, format!("cache file written bytes: {}", res.bytes)); + self.streamlog.append( + Level::INFO, + format!( + "cache file written bytes: {} duration {} ms", + res.bytes, + res.duration.as_millis() + ), + ); continue 'outer; } Err(e) => { diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index cf6dc9f..43a7f30 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -14,7 +14,7 @@ use bytes::Bytes; use err::Error; use futures_core::Stream; use futures_util::StreamExt; -use netpod::{AggKind, ByteOrder, NodeConfigCached, ScalarType, Shape}; +use netpod::{AggKind, BoolNum, ByteOrder, NodeConfigCached, ScalarType, Shape}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use serde::de::DeserializeOwned; use serde::Serialize; @@ -145,6 +145,7 @@ fn make_num_pipeline( ScalarType::I64 => match_end!(i64, byte_order, shape, agg_kind, query, node_config), ScalarType::F32 => match_end!(f32, byte_order, shape, agg_kind, query, node_config), ScalarType::F64 => match_end!(f64, byte_order, shape, agg_kind, query, node_config), + ScalarType::BOOL => match_end!(BoolNum, byte_order, shape, agg_kind, query, node_config), } } diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 0d7150a..aba778f 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -12,6 +12,7 @@ use std::io; use std::path::PathBuf; use std::pin::Pin; use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; use tiny_keccak::Hasher; use tokio::io::{AsyncRead, ReadBuf}; @@ -154,6 +155,7 @@ impl CacheFileDesc { pub struct WrittenPbCache { pub bytes: u64, + pub duration: Duration, } pub async fn write_pb_cache_min_max_avg_scalar( @@ -173,6 +175,7 @@ where }; let path = cfd.path(&node_config); let enc = serde_cbor::to_vec(&values)?; + let ts1 = Instant::now(); tokio::fs::create_dir_all(path.parent().unwrap()).await?; let now = Utc::now(); let mut h = crc32fast::Hasher::new(); @@ -205,7 +208,11 @@ where }) .await??; tokio::fs::rename(&tmp_path, &path).await?; - let ret = WrittenPbCache { bytes: res as u64 }; + let ts2 = Instant::now(); + let ret = WrittenPbCache { + bytes: res as u64, + duration: ts2.duration_since(ts1), + }; Ok(ret) } diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index 9e21070..d342619 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -16,7 +16,7 @@ use futures_core::Stream; use futures_util::future::FutureExt; use futures_util::StreamExt; use netpod::log::*; -use netpod::{AggKind, ByteOrder, Channel, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape}; +use netpod::{AggKind, BoolNum, ByteOrder, Channel, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use serde::de::DeserializeOwned; use serde_json::Value as JsonValue; @@ -161,6 +161,7 @@ where ScalarType::I64 => match_end!(f, i64, byte_order, shape, agg_kind, node_config), ScalarType::F32 => match_end!(f, f32, byte_order, shape, agg_kind, node_config), ScalarType::F64 => match_end!(f, f64, byte_order, shape, agg_kind, node_config), + ScalarType::BOOL => match_end!(f, BoolNum, byte_order, shape, agg_kind, node_config), } } diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index 4f0631a..d00a308 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -5,6 +5,7 @@ use futures_util::StreamExt; use netpod::log::*; use netpod::{ChannelConfig, NanoRange, Nanos, Node}; use std::path::PathBuf; +use std::time::Instant; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncReadExt, AsyncSeekExt, ErrorKind, SeekFrom}; @@ -125,7 +126,14 @@ async fn open_files_inner( } Err(e) => match e.kind() { ErrorKind::NotFound => { + let ts1 = Instant::now(); let res = super::index::position_static_len_datafile(file, range.beg).await?; + let ts2 = Instant::now(); + if false { + // TODO collect for stats: + let dur = ts2.duration_since(ts1); + info!("position_static_len_datafile took ms {}", dur.as_millis()); + } file = res.0; if res.1 { OpenedFile { diff --git a/disk/src/decode.rs b/disk/src/decode.rs index bf3487b..86b4c34 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -11,7 +11,7 @@ use crate::eventchunker::EventFull; use err::Error; use futures_core::Stream; use futures_util::StreamExt; -use netpod::NanoRange; +use netpod::{BoolNum, NanoRange}; use serde::{Deserialize, Serialize}; use std::marker::PhantomData; use std::mem::size_of; @@ -39,6 +39,18 @@ pub trait NumFromBytes { fn convert(buf: &[u8], big_endian: bool) -> NTY; } +impl NumFromBytes for BoolNum { + fn convert(buf: &[u8], _big_endian: bool) -> BoolNum { + BoolNum(buf[0]) + } +} + +impl NumFromBytes for BoolNum { + fn convert(buf: &[u8], _big_endian: bool) -> BoolNum { + BoolNum(buf[0]) + } +} + macro_rules! impl_num_from_bytes_end { ($nty:ident, $nl:expr, $end:ident, $ec:ident) => { impl NumFromBytes<$nty, $end> for $nty { diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 055bd13..30c0027 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -385,6 +385,10 @@ impl Stream for EventChunker { } else { match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(mut fcr))) => { + if false { + // TODO collect for stats: + info!("file read bytes {} ms {}", fcr.buf.len(), fcr.duration.as_millis()); + } let r = self.parse_buf(&mut fcr.buf); match r { Ok(res) => { diff --git a/disk/src/frame/makeframe.rs b/disk/src/frame/makeframe.rs index 7faa120..939f4a4 100644 --- a/disk/src/frame/makeframe.rs +++ b/disk/src/frame/makeframe.rs @@ -10,6 +10,7 @@ use crate::raw::EventQueryJsonStringFrame; use crate::Sitemty; use bytes::{BufMut, BytesMut}; use err::Error; +use netpod::BoolNum; use serde::{de::DeserializeOwned, Serialize}; pub const INMEM_FRAME_ENCID: u32 = 0x12121212; @@ -61,6 +62,10 @@ impl SubFrId for f64 { const SUB: u32 = 12; } +impl SubFrId for BoolNum { + const SUB: u32 = 13; +} + pub trait FrameType { const FRAME_TYPE_ID: u32; } diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 5122e88..71f6ab8 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -80,6 +80,7 @@ pub async fn gen_test_data() -> Result<(), Error> { data_base_path: data_base_path.join(format!("node{:02}", i1)), ksprefix: ksprefix.clone(), backend: "testbackend".into(), + bin_grain_kind: 0, }; ensemble.nodes.push(node); } diff --git a/disk/src/index.rs b/disk/src/index.rs index eb8d954..ddfec6d 100644 --- a/disk/src/index.rs +++ b/disk/src/index.rs @@ -1,31 +1,10 @@ use arrayref::array_ref; use err::Error; use netpod::log::*; -use netpod::{ChannelConfig, Nanos, Node}; +use netpod::Nanos; use std::mem::size_of; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncReadExt, AsyncSeekExt, ErrorKind, SeekFrom}; - -pub async fn find_start_pos_for_config( - ts: Nanos, - channel_config: &ChannelConfig, - node: &Node, -) -> Result, Error> { - let index_path = super::paths::index_path(ts, channel_config, node)?; - let ret = match OpenOptions::new().open(&index_path).await { - Ok(_file) => { - info!("opened index file"); - error!("??????????????? TODO search index for start"); - err::todoval::(); - None - } - Err(e) => match e.kind() { - ErrorKind::NotFound => None, - _ => Err(e)?, - }, - }; - Ok(ret) -} +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncSeekExt, SeekFrom}; pub fn find_ge(h: u64, buf: &[u8]) -> Result, Error> { const N: usize = 2 * size_of::(); diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 54c0a9d..2deb45b 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -316,10 +316,10 @@ pub fn file_content_stream( async_stream::stream! { use tokio::io::AsyncReadExt; loop { + let ts1 = Instant::now(); let mut buf = BytesMut::with_capacity(buffer_size); - let inst1 = Instant::now(); let n1 = file.read_buf(&mut buf).await?; - let inst2 = Instant::now(); + let ts2 = Instant::now(); if n1 == 0 { info!("file EOF"); break; @@ -327,7 +327,7 @@ pub fn file_content_stream( else { let ret = FileChunkRead { buf, - duration: inst2.duration_since(inst1), + duration: ts2.duration_since(ts1), }; yield Ok(ret); } diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 2f5309d..34744c4 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -15,7 +15,7 @@ use err::Error; use futures_core::Stream; use futures_util::StreamExt; use netpod::log::*; -use netpod::{AggKind, ByteOrder, ByteSize, NodeConfigCached, PerfOpts, ScalarType, Shape}; +use netpod::{AggKind, BoolNum, ByteOrder, ByteSize, NodeConfigCached, PerfOpts, ScalarType, Shape}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use std::io; use std::net::SocketAddr; @@ -201,6 +201,7 @@ macro_rules! pipe1 { ScalarType::I64 => pipe2!(i64, $end, $shape, $agg_kind, $event_blobs), ScalarType::F32 => pipe2!(f32, $end, $shape, $agg_kind, $event_blobs), ScalarType::F64 => pipe2!(f64, $end, $shape, $agg_kind, $event_blobs), + ScalarType::BOOL => pipe2!(BoolNum, $end, $shape, $agg_kind, $event_blobs), } }; } @@ -298,10 +299,6 @@ async fn events_conn_handler_inner_try( event_chunker_conf, ); let shape = entry.to_shape().unwrap(); - info!( - "+++++--- conn.rs call pipe1 shape {:?} agg_kind {:?}", - shape, evq.agg_kind - ); let mut p1 = pipe1!(entry.scalar_type, entry.byte_order, shape, evq.agg_kind, event_blobs); while let Some(item) = p1.next().await { //info!("conn.rs encode frame typeid {:x}", item.typeid()); diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 0b874d5..a30ac17 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -225,20 +225,13 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } } else if path.starts_with("/api/1/documentation/") { if req.method() == Method::GET { - static_http_api1!(path, "", "api1.html", "text/html"); - static_http_api1!(path, "style.css", "text/css"); - static_http_api1!(path, "script.js", "text/javascript"); - Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?) + api_1_docs(path) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } } else if path.starts_with("/api/4/documentation/") { if req.method() == Method::GET { - static_http!(path, "", "api4.html", "text/html"); - static_http!(path, "style.css", "text/css"); - static_http!(path, "script.js", "text/javascript"); - static_http!(path, "status-main.html", "text/html"); - Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?) + api_4_docs(path) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } @@ -252,6 +245,21 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } } +pub fn api_4_docs(path: &str) -> Result, Error> { + static_http!(path, "", "api4.html", "text/html"); + static_http!(path, "style.css", "text/css"); + static_http!(path, "script.js", "text/javascript"); + static_http!(path, "status-main.html", "text/html"); + Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?) +} + +pub fn api_1_docs(path: &str) -> Result, Error> { + static_http_api1!(path, "", "api1.html", "text/html"); + static_http_api1!(path, "style.css", "text/css"); + static_http_api1!(path, "script.js", "text/javascript"); + Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?) +} + fn response(status: T) -> http::response::Builder where http::StatusCode: std::convert::TryFrom, diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index 5163352..43b40e5 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -1,10 +1,10 @@ use crate::api1::{channel_search_configs_v1, channel_search_list_v1, gather_json_2_v1, proxy_distribute_v1}; use crate::gather::{gather_get_json_generic, SubRes}; -use crate::{response, Cont}; +use crate::{api_4_docs, response, Cont}; use disk::binned::query::BinnedQuery; use disk::events::PlainEventsJsonQuery; use err::Error; -use http::StatusCode; +use http::{Method, StatusCode}; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, Server}; use itertools::Itertools; @@ -76,6 +76,18 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) Ok(proxy_single_backend_query::(req, proxy_config).await?) } else if path.starts_with("/distribute") { proxy_distribute_v1(req).await + } else if path.starts_with("/api/1/documentation/") { + if req.method() == Method::GET { + Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?) + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } else if path.starts_with("/api/4/documentation/") { + if req.method() == Method::GET { + api_4_docs(path) + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } } else { Ok(response(StatusCode::NOT_FOUND).body(Body::from(format!( "Sorry, not found: {:?} {:?} {:?}", diff --git a/netpod/Cargo.toml b/netpod/Cargo.toml index 45f0cfc..63f3dfb 100644 --- a/netpod/Cargo.toml +++ b/netpod/Cargo.toml @@ -14,4 +14,5 @@ futures-util = "0.3.14" tracing = "0.1.25" url = "2.2" lazy_static = "1.4.0" +num-traits = "0.2" err = { path = "../err" } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index a7b8915..cce5ef9 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -3,9 +3,11 @@ use err::Error; use futures_core::Stream; use futures_util::StreamExt; use serde::{Deserialize, Serialize}; +use std::cmp::Ordering; use std::collections::BTreeMap; -use std::fmt::{Debug, Display, Formatter}; +use std::fmt::{self, Debug, Display, Formatter}; use std::iter::FromIterator; +use std::ops::Add; use std::path::PathBuf; use std::pin::Pin; use std::str::FromStr; @@ -23,6 +25,60 @@ pub const APP_JSON: &'static str = "application/json"; pub const APP_JSON_LINES: &'static str = "application/jsonlines"; pub const APP_OCTET: &'static str = "application/octet-stream"; +#[derive(Copy, Clone, Debug, Serialize, Deserialize)] +pub struct BoolNum(pub u8); + +impl BoolNum { + pub const MIN: Self = Self(0); + pub const MAX: Self = Self(1); +} + +impl Add for BoolNum { + type Output = BoolNum; + + fn add(self, rhs: BoolNum) -> Self::Output { + Self(self.0 + rhs.0) + } +} + +impl num_traits::Zero for BoolNum { + fn zero() -> Self { + Self(0) + } + + fn is_zero(&self) -> bool { + self.0 == 0 + } +} + +impl num_traits::AsPrimitive for BoolNum { + fn as_(self) -> f32 { + self.0 as f32 + } +} + +impl num_traits::Bounded for BoolNum { + fn min_value() -> Self { + Self(0) + } + + fn max_value() -> Self { + Self(1) + } +} + +impl PartialEq for BoolNum { + fn eq(&self, other: &Self) -> bool { + PartialEq::eq(&self.0, &other.0) + } +} + +impl PartialOrd for BoolNum { + fn partial_cmp(&self, other: &Self) -> Option { + PartialOrd::partial_cmp(&self.0, &other.0) + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AggQuerySingleChannel { pub channel_config: ChannelConfig, @@ -48,14 +104,15 @@ pub enum ScalarType { I64, F32, F64, + BOOL, } impl ScalarType { pub fn from_dtype_index(ix: u8) -> Result { use ScalarType::*; let g = match ix { - 0 => return Err(Error::with_msg(format!("BOOL not supported"))), - 1 => return Err(Error::with_msg(format!("BOOL8 not supported"))), + 0 => BOOL, + 1 => BOOL, 3 => U8, 5 => U16, 8 => U32, @@ -86,6 +143,7 @@ impl ScalarType { I64 => 8, F32 => 4, F64 => 8, + BOOL => 1, } } @@ -102,6 +160,7 @@ impl ScalarType { I64 => 9, F32 => 11, F64 => 12, + BOOL => 0, } } } @@ -116,6 +175,8 @@ pub struct Node { pub data_base_path: PathBuf, pub ksprefix: String, pub backend: String, + #[serde(default)] + pub bin_grain_kind: u32, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -326,11 +387,18 @@ pub mod timeunits { pub const DAY: u64 = HOUR * 24; } -const BIN_T_LEN_OPTIONS: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY]; +const BIN_T_LEN_OPTIONS_0: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY]; -const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 4] = [MIN * 60, HOUR * 2, DAY * 4, DAY * 32]; +const BIN_T_LEN_OPTIONS_1: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY]; -const PATCH_T_LEN_OPTIONS_WAVE: [u64; 4] = [MIN * 20, HOUR * 2, DAY * 4, DAY * 32]; +const PATCH_T_LEN_KEY: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY]; + +//const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 4] = [MIN * 60, HOUR * 4, DAY * 4, DAY * 32]; + +// Testing this for GLS: +const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 4] = [HOUR * 4, DAY * 4, DAY * 16, DAY * 32]; + +const PATCH_T_LEN_OPTIONS_WAVE: [u64; 4] = [MIN * 10, HOUR * 2, DAY * 4, DAY * 32]; const BIN_THRESHOLDS: [u64; 31] = [ 2, @@ -385,7 +453,7 @@ impl PreBinnedPatchGridSpec { } pub fn is_valid_bin_t_len(bin_t_len: u64) -> bool { - for &j in BIN_T_LEN_OPTIONS.iter() { + for &j in PATCH_T_LEN_KEY.iter() { if bin_t_len == j { return true; } @@ -421,14 +489,14 @@ fn get_patch_t_len(bin_t_len: u64) -> u64 { let shape = Shape::Scalar; match shape { Shape::Scalar => { - for (i1, &j) in BIN_T_LEN_OPTIONS.iter().enumerate() { + for (i1, &j) in PATCH_T_LEN_KEY.iter().enumerate() { if bin_t_len == j { return PATCH_T_LEN_OPTIONS_SCALAR[i1]; } } } Shape::Wave(_) => { - for (i1, &j) in BIN_T_LEN_OPTIONS.iter().enumerate() { + for (i1, &j) in PATCH_T_LEN_KEY.iter().enumerate() { if bin_t_len == j { return PATCH_T_LEN_OPTIONS_WAVE[i1]; } @@ -440,25 +508,30 @@ fn get_patch_t_len(bin_t_len: u64) -> u64 { impl PreBinnedPatchRange { /// Cover at least the given range with at least as many as the requested number of bins. - pub fn covering_range(range: NanoRange, min_bin_count: u32) -> Result, Error> { + pub fn covering_range(range: NanoRange, min_bin_count: u32, bin_grain_kind: u32) -> Result, Error> { + let bin_t_len_options = if bin_grain_kind == 1 { + &BIN_T_LEN_OPTIONS_1 + } else { + &BIN_T_LEN_OPTIONS_0 + }; if min_bin_count < 1 { Err(Error::with_msg("min_bin_count < 1"))?; } - if min_bin_count > 4000 { - Err(Error::with_msg("min_bin_count > 4000"))?; + if min_bin_count > 6000 { + Err(Error::with_msg("min_bin_count > 6000"))?; } let dt = range.delta(); - if dt > DAY * 14 { - Err(Error::with_msg("dt > DAY * 14"))?; + if dt > DAY * 200 { + Err(Error::with_msg("dt > DAY * 200"))?; } let bs = dt / min_bin_count as u64; - let mut i1 = BIN_T_LEN_OPTIONS.len(); + let mut i1 = bin_t_len_options.len(); loop { if i1 <= 0 { break Ok(None); } else { i1 -= 1; - let t = BIN_T_LEN_OPTIONS[i1]; + let t = bin_t_len_options[i1]; if t <= bs { let bin_t_len = t; let patch_t_len = get_patch_t_len(bin_t_len); @@ -585,7 +658,7 @@ impl BinnedGridSpec { } pub fn is_valid_bin_t_len(bin_t_len: u64) -> bool { - for &j in BIN_T_LEN_OPTIONS.iter() { + for &j in PATCH_T_LEN_KEY.iter() { if bin_t_len == j { return true; } @@ -594,8 +667,8 @@ impl BinnedGridSpec { } } -impl std::fmt::Debug for BinnedGridSpec { - fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { +impl fmt::Debug for BinnedGridSpec { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { if self.bin_t_len < SEC * 90 { write!(fmt, "BinnedGridSpec {{ bin_t_len: {:?} ms }}", self.bin_t_len / MS,) } else { @@ -612,25 +685,26 @@ pub struct BinnedRange { } impl BinnedRange { - pub fn covering_range(range: NanoRange, min_bin_count: u32) -> Result, Error> { + pub fn covering_range(range: NanoRange, min_bin_count: u32, _bin_grain_kind: u32) -> Result, Error> { + let thresholds = &BIN_THRESHOLDS; if min_bin_count < 1 { Err(Error::with_msg("min_bin_count < 1"))?; } - if min_bin_count > 4000 { - Err(Error::with_msg("min_bin_count > 4000"))?; + if min_bin_count > 6000 { + Err(Error::with_msg("min_bin_count > 6000"))?; } let dt = range.delta(); - if dt > DAY * 14 { - Err(Error::with_msg("dt > DAY * 14"))?; + if dt > DAY * 200 { + Err(Error::with_msg("dt > DAY * 200"))?; } let bs = dt / min_bin_count as u64; - let mut i1 = BIN_THRESHOLDS.len(); + let mut i1 = thresholds.len(); loop { if i1 <= 0 { break Ok(None); } else { i1 -= 1; - let t = BIN_THRESHOLDS[i1]; + let t = thresholds[i1]; if t <= bs || i1 == 0 { let grid_spec = BinnedGridSpec { bin_t_len: t }; let bl = grid_spec.bin_t_len(); @@ -648,12 +722,14 @@ impl BinnedRange { } } } + pub fn get_range(&self, ix: u32) -> NanoRange { NanoRange { beg: (self.offset + ix as u64) * self.grid_spec.bin_t_len, end: (self.offset + ix as u64 + 1) * self.grid_spec.bin_t_len, } } + pub fn full_range(&self) -> NanoRange { NanoRange { beg: (self.offset + 0) * self.grid_spec.bin_t_len,