diff --git a/.cargo/config.toml b/.cargo/config.toml index 3530dfa..a00eda8 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,2 +1,2 @@ [build] -rustflags = ["-C", "force-frame-pointers"] +#rustflags = ["-C", "force-frame-pointers"] diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 56420e2..bd7866c 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -1,5 +1,5 @@ use crate::agg::binnedt::{TBinnerStream, TimeBinnableType, TimeBinnableTypeAggregator}; -use crate::agg::enp::{ts_offs_from_abs, Identity, WaveXBinner}; +use crate::agg::enp::ts_offs_from_abs; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::{Appendable, Collectable, Collector, StreamItem, ToJsonBytes, ToJsonResult}; @@ -8,10 +8,7 @@ use crate::binned::binnedfrompbv::BinnedFromPreBinned; use crate::binned::query::BinnedQuery; use crate::binnedstream::BoxedStream; use crate::channelexec::{channel_exec, collect_plain_events_json, ChannelExecFunction}; -use crate::decode::{ - Endianness, EventValueFromBytes, EventValueShape, EventValues, EventValuesDim0Case, EventValuesDim1Case, - NumFromBytes, -}; +use crate::decode::{Endianness, EventValueFromBytes, EventValueShape, EventValues, NumFromBytes}; use crate::frame::makeframe::{Framable, FrameType, SubFrId}; use crate::merge::mergedfromremotes::MergedFromRemotes; use crate::raw::RawEventsQuery; @@ -20,7 +17,7 @@ use bytes::Bytes; use chrono::{TimeZone, Utc}; use err::Error; use futures_core::Stream; -use futures_util::{FutureExt, StreamExt}; +use futures_util::StreamExt; use netpod::log::*; use netpod::timeunits::SEC; use netpod::{ @@ -28,7 +25,6 @@ use netpod::{ PreBinnedPatchRange, Shape, }; use num_traits::{AsPrimitive, Bounded, Float, Zero}; -use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize, Serializer}; use std::fmt; @@ -52,443 +48,6 @@ pub struct BinnedStreamRes { pub range: BinnedRange, } -pub struct BinnedResponseStat { - stream: Pin> + Send>>, - bin_count: u32, -} - -// TODO Can I unify these functions with the ones from prebinned.rs? -// They also must resolve to the same types, so would be good to unify. - -fn make_num_pipeline_nty_end_evs_enp_stat( - shape: Shape, - event_value_shape: EVS, - query: BinnedQuery, - node_config: &NodeConfigCached, -) -> Result::Output as TimeBinnableType>::Output>, Error> -where - NTY: NumOps + NumFromBytes + Serialize + 'static, - END: Endianness + 'static, - EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Output> + 'static, - ::Output: TimeBinnableType + PushableIndex + Appendable + 'static, - <::Output as TimeBinnableType>::Output: - TimeBinnableType::Output as TimeBinnableType>::Output> + Unpin, - Sitemty< - <<::Output as TimeBinnableType>::Aggregator as TimeBinnableTypeAggregator>::Output, - >: Framable, - // TODO require these things in general? - Sitemty<::Output>: FrameType + Framable + 'static, - Sitemty<<::Output as TimeBinnableType>::Output>: - FrameType + Framable + DeserializeOwned, - <::Output as TimeBinnableType>::Output: Sized, -{ - let _ = event_value_shape; - let range = BinnedRange::covering_range( - query.range().clone(), - query.bin_count(), - node_config.node.bin_grain_kind, - )? - .ok_or(Error::with_msg(format!( - "binned_bytes_for_http BinnedRange::covering_range returned None" - )))?; - let perf_opts = PerfOpts { inmem_bufcap: 512 }; - match PreBinnedPatchRange::covering_range( - query.range().clone(), - query.bin_count(), - node_config.node.bin_grain_kind, - ) { - Ok(Some(pre_range)) => { - info!("binned_bytes_for_http found pre_range: {:?}", pre_range); - if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { - let msg = format!( - "binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}", - pre_range, range - ); - return Err(Error::with_msg(msg)); - } - let s = BinnedFromPreBinned::<<::Output as TimeBinnableType>::Output>::new( - PreBinnedPatchIterator::from_range(pre_range), - query.channel().clone(), - range.clone(), - shape, - query.agg_kind().clone(), - query.cache_usage().clone(), - node_config, - query.disk_stats_every().clone(), - query.report_error(), - )?; - let ret = BinnedResponseStat { - stream: Box::pin(s), - bin_count: range.count as u32, - }; - Ok(ret) - } - Ok(None) => { - info!( - "binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}", - range - ); - let bin_count = range.count as u32; - let evq = RawEventsQuery { - channel: query.channel().clone(), - range: query.range().clone(), - agg_kind: query.agg_kind().clone(), - }; - let x_bin_count = if let AggKind::DimXBinsN(n) = query.agg_kind() { - *n as usize - } else { - 0 - }; - let s = MergedFromRemotes::::new(evq, perf_opts, node_config.node_config.cluster.clone()); - let s = TBinnerStream::<_, ::Output>::new(s, range, x_bin_count); - let ret = BinnedResponseStat { - stream: Box::pin(s), - bin_count, - }; - Ok(ret) - } - Err(e) => Err(e), - } -} - -pub trait BinnedResponseItem: Send + ToJsonResult + Framable {} - -impl BinnedResponseItem for T where T: Send + ToJsonResult + Framable {} - -pub struct BinnedResponseDyn { - stream: Pin> + Send>>, -} - -// TODO remove after refactor of PPP: -fn make_num_pipeline_nty_end_evs_enp( - shape: Shape, - _agg_kind: AggKind, - event_value_shape: EVS, - _events_node_proc: ENP, - query: BinnedQuery, - ppp: PPP, - node_config: &NodeConfigCached, -) -> Result -where - PPP: PipelinePostProcessA, - PPP: PipelinePostProcessB<<::Output as TimeBinnableType>::Output>, - NTY: NumOps + NumFromBytes + 'static, - END: Endianness + 'static, - EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Output> + 'static, - // TODO require these properties in general: - ::Output: TimeBinnableType + PushableIndex + Appendable + 'static, - <::Output as TimeBinnableType>::Output: - TimeBinnableType::Output as TimeBinnableType>::Output> + Unpin, - Sitemty< - <<::Output as TimeBinnableType>::Aggregator as TimeBinnableTypeAggregator>::Output, - >: Framable, - // TODO require these things in general? - Sitemty<::Output>: FrameType + Framable + 'static, - // TODO is this correct? why do I want the Output to be Framable? - Sitemty<<::Output as TimeBinnableType>::Output>: - FrameType + Framable + DeserializeOwned, - Sitemty<<::Output as TimeBinnableType>::Output>: ToJsonResult + Framable, -{ - let res = make_num_pipeline_nty_end_evs_enp_stat::<_, _, _, ENP>(shape, event_value_shape, query, node_config)?; - let s = ppp.convert(res.stream, res.bin_count); - let ret = BinnedResponseDyn { stream: Box::pin(s) }; - Ok(ret) -} - -// TODO remove after refactor of PPP: -#[allow(dead_code)] -fn make_num_pipeline_nty_end( - shape: Shape, - query: BinnedQuery, - ppp: PPP, - node_config: &NodeConfigCached, -) -> Result -where - PPP: PipelinePostProcessA, - PPP: PipelinePostProcessB>, - NTY: NumOps + NumFromBytes + Serialize + 'static, - END: Endianness + 'static, -{ - let agg_kind = query.agg_kind().clone(); - match shape { - Shape::Scalar => { - let evs = EventValuesDim0Case::new(); - match agg_kind { - AggKind::DimXBins1 => { - let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); - make_num_pipeline_nty_end_evs_enp::<_, NTY, END, _, _>( - shape, - agg_kind, - evs, - events_node_proc, - query, - ppp, - node_config, - ) - } - AggKind::DimXBinsN(_) => { - let events_node_proc = < as EventValueShape>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); - make_num_pipeline_nty_end_evs_enp::<_, NTY, END, _, _>( - shape, - agg_kind, - evs, - events_node_proc, - query, - ppp, - node_config, - ) - } - AggKind::Plain => { - panic!(); - } - } - } - Shape::Wave(n) => { - let evs = EventValuesDim1Case::new(n); - match agg_kind { - AggKind::DimXBins1 => { - let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); - make_num_pipeline_nty_end_evs_enp::( - shape, - agg_kind, - evs, - events_node_proc, - query, - ppp, - node_config, - ) - } - AggKind::DimXBinsN(_) => { - let _events_node_proc = < as EventValueShape>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); - /*let yo = make_num_pipeline_nty_end_evs_enp::( - shape, - agg_kind, - evs, - events_node_proc, - query, - ppp, - node_config, - );*/ - err::todoval() - } - AggKind::Plain => { - panic!(); - } - } - } - } -} - -// TODO remove after refactor of PPP: -#[allow(dead_code)] -fn make_num_pipeline_nty_end_old( - shape: Shape, - query: BinnedQuery, - ppp: PPP, - node_config: &NodeConfigCached, -) -> Result -where - PPP: PipelinePostProcessA, - PPP: PipelinePostProcessB>, - PPP: PipelinePostProcessB>, - NTY: NumOps + NumFromBytes + 'static, - END: Endianness + 'static, -{ - let agg_kind = query.agg_kind().clone(); - match shape { - Shape::Scalar => make_num_pipeline_nty_end_evs_enp::<_, NTY, _, _, Identity<_>>( - shape.clone(), - agg_kind.clone(), - EventValuesDim0Case::new(), - Identity::create(shape.clone(), agg_kind.clone()), - query, - ppp, - node_config, - ), - Shape::Wave(n) => make_num_pipeline_nty_end_evs_enp::<_, NTY, _, _, WaveXBinner<_>>( - shape.clone(), - agg_kind.clone(), - EventValuesDim1Case::new(n), - WaveXBinner::create(shape.clone(), agg_kind.clone()), - query, - ppp, - node_config, - ), - } -} - -// TODO remove after refactor of PPP: -#[allow(unused_macros)] -macro_rules! match_end { - ($nty:ident, $end:expr, $shape:expr, $query:expr, $ppp:expr, $node_config:expr) => { - match $end { - ByteOrder::LE => make_num_pipeline_nty_end::<_, $nty, LittleEndian>($shape, $query, $ppp, $node_config), - ByteOrder::BE => make_num_pipeline_nty_end::<_, $nty, BigEndian>($shape, $query, $ppp, $node_config), - } - }; -} - -// TODO remove after refactor of PPP -/*fn make_num_pipeline_entry( - scalar_type: ScalarType, - byte_order: ByteOrder, - shape: Shape, - query: BinnedQuery, - ppp: PPP, - node_config: &NodeConfigCached, -) -> Result -where - PPP: PipelinePostProcessA, - PPP: PipelinePostProcessB>, - PPP: PipelinePostProcessB>, - PPP: PipelinePostProcessB>, - PPP: PipelinePostProcessB>, - PPP: PipelinePostProcessB>, - PPP: PipelinePostProcessB>, - PPP: PipelinePostProcessB>, - PPP: PipelinePostProcessB>, - PPP: PipelinePostProcessB>, - PPP: PipelinePostProcessB>, -{ - match scalar_type { - ScalarType::U8 => match_end!(u8, byte_order, shape, query, ppp, node_config), - ScalarType::U16 => match_end!(u16, byte_order, shape, query, ppp, node_config), - ScalarType::U32 => match_end!(u32, byte_order, shape, query, ppp, node_config), - ScalarType::U64 => match_end!(u64, byte_order, shape, query, ppp, node_config), - ScalarType::I8 => match_end!(i8, byte_order, shape, query, ppp, node_config), - ScalarType::I16 => match_end!(i16, byte_order, shape, query, ppp, node_config), - ScalarType::I32 => match_end!(i32, byte_order, shape, query, ppp, node_config), - ScalarType::I64 => match_end!(i64, byte_order, shape, query, ppp, node_config), - ScalarType::F32 => match_end!(f32, byte_order, shape, query, ppp, node_config), - ScalarType::F64 => match_end!(f64, byte_order, shape, query, ppp, node_config), - ScalarType::BOOL => match_end!(BoolNum, byte_order, shape, query, ppp, node_config), - } -}*/ - -async fn make_num_pipeline( - query: &BinnedQuery, - _ppp: PPP, - node_config: &NodeConfigCached, -) -> Result { - if query.channel().backend != node_config.node.backend { - let err = Error::with_msg(format!( - "backend mismatch node: {} requested: {}", - node_config.node.backend, - query.channel().backend - )); - return Err(err); - } - let channel_config = match read_local_config(&query.channel(), &node_config.node).await { - Ok(k) => k, - Err(e) => { - if e.msg().contains("ErrorKind::NotFound") { - let s = futures_util::stream::empty(); - let ret = BinnedResponseDyn { stream: Box::pin(s) }; - return Ok(ret); - } else { - return Err(e); - } - } - }; - match extract_matching_config_entry(query.range(), &channel_config)? { - MatchingConfigEntry::Multiple => Err(Error::with_msg("multiple config entries found"))?, - MatchingConfigEntry::None => { - // TODO can I use the same binned_stream machinery to construct the matching empty result? - // Need the requested range all with empty/nan values and zero counts. - let s = futures_util::stream::empty(); - let ret = BinnedResponseDyn { stream: Box::pin(s) }; - Ok(ret) - } - MatchingConfigEntry::Entry(entry) => { - // TODO make this a stream log: - info!("binned_bytes_for_http found config entry {:?}", entry); - /*let ret = make_num_pipeline_entry( - entry.scalar_type.clone(), - entry.byte_order.clone(), - entry.to_shape()?, - query.clone(), - ppp, - node_config, - )?; - Ok(ret)*/ - err::todoval() - } - } -} - -pub trait PipelinePostProcessA {} - -struct MakeBoxedItems {} - -impl PipelinePostProcessA for MakeBoxedItems {} - -pub trait PipelinePostProcessB { - fn convert( - &self, - inp: Pin> + Send>>, - bin_count_exp: u32, - ) -> Pin> + Send>>; -} - -impl PipelinePostProcessB> for MakeBoxedItems -where - NTY: NumOps, -{ - fn convert( - &self, - inp: Pin>> + Send>>, - _bin_count_exp: u32, - ) -> Pin> + Send>> { - let s = StreamExt::map(inp, |item| Box::new(item) as Box); - Box::pin(s) - } -} - -struct CollectForJson { - timeout: Duration, - abort_after_bin_count: u32, -} - -impl CollectForJson { - #[allow(dead_code)] - pub fn new(timeout: Duration, abort_after_bin_count: u32) -> Self { - Self { - timeout, - abort_after_bin_count, - } - } -} - -impl PipelinePostProcessA for CollectForJson {} - -pub struct JsonCollector { - fut: Pin> + Send>>, - completed: bool, - done: bool, -} - -impl JsonCollector { - pub fn new( - inp: Pin>> + Send>>, - bin_count_exp: u32, - timeout: Duration, - abort_after_bin_count: u32, - ) -> Self - where - NTY: NumOps + Serialize + 'static, - { - let fut = collect_all(inp, bin_count_exp, timeout, abort_after_bin_count); - let fut = Box::pin(fut); - Self { - fut, - completed: false, - done: false, - } - } -} - impl ToJsonBytes for serde_json::Value { fn to_json_bytes(&self) -> Result, Error> { Ok(serde_json::to_vec(self)?) @@ -511,50 +70,109 @@ impl ToJsonResult for Sitemty { } } -impl Stream for JsonCollector { - type Item = Box; +pub struct BinnedBinaryChannelExec { + query: BinnedQuery, + node_config: NodeConfigCached, +} - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - loop { - break if self.completed { - panic!("poll_next on completed") - } else if self.done { - self.completed = true; - Ready(None) - } else { - match self.fut.poll_unpin(cx) { - Ready(Ok(item)) => { - let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); - let item = Box::new(item) as Box; - self.done = true; - Ready(Some(item)) - } - Ready(Err(e)) => { - // TODO don't emit the error as json. - let item = Err::>, _>(e); - let item = Box::new(item) as Box; - self.done = true; - Ready(Some(item)) - } - Pending => Pending, - } - }; - } +impl BinnedBinaryChannelExec { + pub fn new(query: BinnedQuery, node_config: NodeConfigCached) -> Self { + Self { query, node_config } } } -impl PipelinePostProcessB> for CollectForJson -where - NTY: NumOps, -{ - fn convert( - &self, - inp: Pin>> + Send>>, - bin_count_exp: u32, - ) -> Pin> + Send>> { - let s = JsonCollector::new(inp, bin_count_exp, self.timeout, self.abort_after_bin_count); - Box::pin(s) +impl ChannelExecFunction for BinnedBinaryChannelExec { + type Output = Pin> + Send>>; + + fn exec( + self, + _byte_order: END, + shape: Shape, + event_value_shape: EVS, + _events_node_proc: ENP, + ) -> Result + where + NTY: NumOps + NumFromBytes + 'static, + END: Endianness + 'static, + EVS: EventValueShape + EventValueFromBytes + 'static, + ENP: EventsNodeProcessor>::Output> + 'static, + // TODO require these things in general? + ::Output: Collectable + PushableIndex, + <::Output as TimeBinnableType>::Output: Debug + + TimeBinnableType::Output as TimeBinnableType>::Output> + + Collectable + + Unpin, + Sitemty<::Output>: FrameType + Framable + 'static, + Sitemty<<::Output as TimeBinnableType>::Output>: + FrameType + Framable + DeserializeOwned, + { + let _ = event_value_shape; + let range = BinnedRange::covering_range( + self.query.range().clone(), + self.query.bin_count(), + self.node_config.node.bin_grain_kind, + )? + .ok_or(Error::with_msg(format!( + "BinnedBinaryChannelExec BinnedRange::covering_range returned None" + )))?; + let perf_opts = PerfOpts { inmem_bufcap: 512 }; + let souter = match PreBinnedPatchRange::covering_range( + self.query.range().clone(), + self.query.bin_count(), + self.node_config.node.bin_grain_kind, + ) { + Ok(Some(pre_range)) => { + info!("BinnedBinaryChannelExec found pre_range: {:?}", pre_range); + if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { + let msg = format!( + "BinnedBinaryChannelExec incompatible ranges:\npre_range: {:?}\nrange: {:?}", + pre_range, range + ); + return Err(Error::with_msg(msg)); + } + let s = BinnedFromPreBinned::<<::Output as TimeBinnableType>::Output>::new( + PreBinnedPatchIterator::from_range(pre_range), + self.query.channel().clone(), + range.clone(), + shape, + self.query.agg_kind().clone(), + self.query.cache_usage().clone(), + &self.node_config, + self.query.disk_stats_every().clone(), + self.query.report_error(), + )? + .map(|item| match item.make_frame() { + Ok(item) => Ok(item.freeze()), + Err(e) => Err(e), + }); + Ok(Box::pin(s) as Pin> + Send>>) + } + Ok(None) => { + info!( + "BinnedBinaryChannelExec no covering range for prebinned, merge from remotes instead {:?}", + range + ); + let evq = RawEventsQuery { + channel: self.query.channel().clone(), + range: self.query.range().clone(), + agg_kind: self.query.agg_kind().clone(), + }; + let x_bin_count = x_bin_count(&shape, self.query.agg_kind()); + let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); + let s = TBinnerStream::<_, ::Output>::new(s, range, x_bin_count); + let s = s.map(|item| match item.make_frame() { + Ok(item) => Ok(item.freeze()), + Err(e) => Err(e), + }); + Ok(Box::pin(s) as Pin> + Send>>) + } + Err(e) => Err(e), + }?; + Ok(souter) + } + + fn empty() -> Self::Output { + Box::pin(futures_util::stream::empty()) } } @@ -562,12 +180,14 @@ pub async fn binned_bytes_for_http( query: &BinnedQuery, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { - let pl = make_num_pipeline::(query, MakeBoxedItems {}, node_config).await?; - let ret = pl.stream.map(|item| { - let fr = item.make_frame(); - let fr = fr?; - Ok(fr.freeze()) - }); + let ret = channel_exec( + BinnedBinaryChannelExec::new(query.clone(), node_config.clone()), + query.channel(), + query.range(), + query.agg_kind().clone(), + node_config, + ) + .await?; Ok(Box::pin(ret)) } @@ -758,7 +378,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec { self.node_config.node.bin_grain_kind, )? .ok_or(Error::with_msg(format!( - "binned_bytes_for_http BinnedRange::covering_range returned None" + "BinnedJsonChannelExec BinnedRange::covering_range returned None" )))?; let t_bin_count = range.count as u32; let perf_opts = PerfOpts { inmem_bufcap: 512 }; @@ -768,10 +388,10 @@ impl ChannelExecFunction for BinnedJsonChannelExec { self.node_config.node.bin_grain_kind, ) { Ok(Some(pre_range)) => { - info!("binned_bytes_for_http found pre_range: {:?}", pre_range); + info!("BinnedJsonChannelExec found pre_range: {:?}", pre_range); if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { let msg = format!( - "binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}", + "BinnedJsonChannelExec incompatible ranges:\npre_range: {:?}\nrange: {:?}", pre_range, range ); return Err(Error::with_msg(msg)); @@ -796,7 +416,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec { } Ok(None) => { info!( - "binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}", + "BinnedJsonChannelExec no covering range for prebinned, merge from remotes instead {:?}", range ); let evq = RawEventsQuery { diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index cce5ef9..f42ff5a 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -393,10 +393,9 @@ const BIN_T_LEN_OPTIONS_1: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY]; const PATCH_T_LEN_KEY: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY]; -//const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 4] = [MIN * 60, HOUR * 4, DAY * 4, DAY * 32]; - -// Testing this for GLS: -const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 4] = [HOUR * 4, DAY * 4, DAY * 16, DAY * 32]; +const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 4] = [MIN * 60, HOUR * 4, DAY * 4, DAY * 32]; +// Maybe alternative for GLS: +//const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 4] = [HOUR * 4, DAY * 4, DAY * 16, DAY * 32]; const PATCH_T_LEN_OPTIONS_WAVE: [u64; 4] = [MIN * 10, HOUR * 2, DAY * 4, DAY * 32]; @@ -517,8 +516,8 @@ impl PreBinnedPatchRange { if min_bin_count < 1 { Err(Error::with_msg("min_bin_count < 1"))?; } - if min_bin_count > 6000 { - Err(Error::with_msg("min_bin_count > 6000"))?; + if min_bin_count > 20000 { + Err(Error::with_msg(format!("min_bin_count > 20000: {}", min_bin_count)))?; } let dt = range.delta(); if dt > DAY * 200 { @@ -690,8 +689,8 @@ impl BinnedRange { if min_bin_count < 1 { Err(Error::with_msg("min_bin_count < 1"))?; } - if min_bin_count > 6000 { - Err(Error::with_msg("min_bin_count > 6000"))?; + if min_bin_count > 20000 { + Err(Error::with_msg(format!("min_bin_count > 20000: {}", min_bin_count)))?; } let dt = range.delta(); if dt > DAY * 200 {