This commit is contained in:
Dominik Werder
2023-03-07 20:34:56 +01:00
parent 617c21cdc3
commit 412512a43b
2 changed files with 214 additions and 131 deletions

View File

@@ -1111,30 +1111,150 @@ pub mod timeunits {
pub const DAY: u64 = HOUR * 24;
}
pub trait Dim0Index: Clone + fmt::Debug + ops::Add + ops::Sub + PartialOrd {
pub trait Dim0Index: Clone + fmt::Debug + PartialOrd {
fn add(&self, v: &Self) -> Self;
fn sub(&self, v: &Self) -> Self;
fn sub_n(&self, v: u64) -> Self;
fn times(&self, x: u64) -> Self;
fn div_n(&self, n: u64) -> Self;
fn div_v(&self, v: &Self) -> u64;
fn as_u64(&self) -> u64;
fn series_range(a: Self, b: Self) -> SeriesRange;
fn prebin_bin_len_opts() -> &'static [Self];
fn prebin_patch_len_for(i: usize) -> Self;
fn to_pre_binned_patch_range_enum(
bin_len: Self,
bin_len: &Self,
bin_count: u64,
patch_offset: u64,
patch_count: u64,
) -> PreBinnedPatchRangeEnum;
fn binned_bin_len_opts() -> &'static [Self];
fn to_binned_range_enum(bin_len: &Self, bin_off: u64, bin_cnt: u64) -> BinnedRangeEnum;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd)]
pub struct TsNano(u64);
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd)]
pub struct PulseId(u64);
impl Dim0Index for TsNano {}
impl Dim0Index for TsNano {
fn add(&self, v: &Self) -> Self {
todo!()
}
impl Dim0Index for PulseId {}
fn sub(&self, v: &Self) -> Self {
todo!()
}
fn sub_n(&self, v: u64) -> Self {
todo!()
}
fn times(&self, x: u64) -> Self {
todo!()
}
fn div_n(&self, n: u64) -> Self {
todo!()
}
fn div_v(&self, v: &Self) -> u64 {
todo!()
}
fn as_u64(&self) -> u64 {
todo!()
}
fn series_range(a: Self, b: Self) -> SeriesRange {
todo!()
}
fn prebin_bin_len_opts() -> &'static [Self] {
todo!()
}
fn prebin_patch_len_for(i: usize) -> Self {
todo!()
}
fn to_pre_binned_patch_range_enum(
bin_len: &Self,
bin_count: u64,
patch_offset: u64,
patch_count: u64,
) -> PreBinnedPatchRangeEnum {
todo!()
}
fn binned_bin_len_opts() -> &'static [Self] {
todo!()
}
fn to_binned_range_enum(bin_len: &Self, bin_off: u64, bin_cnt: u64) -> BinnedRangeEnum {
todo!()
}
}
impl Dim0Index for PulseId {
fn add(&self, v: &Self) -> Self {
todo!()
}
fn sub(&self, v: &Self) -> Self {
todo!()
}
fn sub_n(&self, v: u64) -> Self {
todo!()
}
fn times(&self, x: u64) -> Self {
todo!()
}
fn div_n(&self, n: u64) -> Self {
todo!()
}
fn div_v(&self, v: &Self) -> u64 {
todo!()
}
fn as_u64(&self) -> u64 {
todo!()
}
fn series_range(a: Self, b: Self) -> SeriesRange {
todo!()
}
fn prebin_bin_len_opts() -> &'static [Self] {
todo!()
}
fn prebin_patch_len_for(i: usize) -> Self {
todo!()
}
fn to_pre_binned_patch_range_enum(
bin_len: &Self,
bin_count: u64,
patch_offset: u64,
patch_count: u64,
) -> PreBinnedPatchRangeEnum {
todo!()
}
fn binned_bin_len_opts() -> &'static [Self] {
todo!()
}
fn to_binned_range_enum(bin_len: &Self, bin_off: u64, bin_cnt: u64) -> BinnedRangeEnum {
todo!()
}
}
const PREBIN_TIME_BIN_LEN_VAR0: [TsNano; 3] = [TsNano(MIN * 1), TsNano(HOUR * 1), TsNano(DAY)];
@@ -1196,7 +1316,7 @@ const TIME_BIN_THRESHOLDS: [u64; 39] = [
DAY * 64,
];
const PULSE_BIN_THRESHOLDS: [u64; 10] = [
const PULSE_BIN_THRESHOLDS: [u64; 25] = [
10, 20, 40, 80, 100, 200, 400, 800, 1000, 2000, 4000, 8000, 10000, 20000, 40000, 80000, 100000, 200000, 400000,
800000, 1000000, 2000000, 4000000, 8000000, 10000000,
];
@@ -1234,7 +1354,7 @@ where
}
}
pub fn bin_len(&self) -> T {
self.bin_len
self.bin_len.clone()
}
pub fn patch_len(&self) -> T {
@@ -1264,16 +1384,16 @@ where
pub fn edges(&self) -> Vec<T> {
let mut ret = Vec::new();
let mut t = self.patch_beg();
ret.push(t);
ret.push(t.clone());
for _ in 0..self.bin_count() {
t += self.bin_t_len();
ret.push(t);
t = t.add(&self.bin_len);
ret.push(t.clone());
}
ret
}
pub fn next(&self) -> Self {
Self::new(self.bin_len, self.bin_count, 1 + self.patch_offset)
Self::new(self.bin_len.clone(), self.bin_count, 1 + self.patch_offset)
}
}
@@ -1290,6 +1410,28 @@ where
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PreBinnedPatchCoordEnum {
Time(PreBinnedPatchCoord<TsNano>),
Pulse(PreBinnedPatchCoord<PulseId>),
}
impl FromUrl for PreBinnedPatchCoordEnum {
fn from_url(url: &Url) -> Result<Self, Error> {
todo!()
}
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Error> {
todo!()
}
}
impl AppendToUrl for PreBinnedPatchCoordEnum {
fn append_to_url(&self, url: &mut Url) {
todo!()
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PreBinnedPatchRange<T>
where
@@ -1305,38 +1447,24 @@ where
{
pub fn edges(&self) -> Vec<u64> {
let mut ret = Vec::new();
let mut t = self.grid_spec.patch_t_len() * self.offset;
ret.push(t);
let bin_count = self.grid_spec.patch_t_len() / self.grid_spec.bin_t_len() * self.count;
let bin_len = self.grid_spec.bin_t_len();
for _ in 0..bin_count {
t += bin_len;
ret.push(t);
}
if ret.len() as u64 != self.bin_count() + 1 {
error!("edges() yields wrong number {} vs {}", ret.len(), self.bin_count());
panic!();
}
err::todo();
ret
}
pub fn range(&self) -> NanoRange {
let pl = self.grid_spec.patch_t_len;
NanoRange {
beg: pl * self.offset,
end: pl * (self.offset + self.count),
}
pub fn series_range(&self) -> SeriesRange {
T::series_range(err::todoval(), err::todoval())
}
pub fn patch_count(&self) -> u64 {
self.count
self.patch_count
}
pub fn bin_count(&self) -> u64 {
self.grid_spec.patch_t_len() / self.grid_spec.bin_t_len() * self.patch_count()
err::todoval()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PreBinnedPatchRangeEnum {
Time(PreBinnedPatchRange<TsNano>),
Pulse(PreBinnedPatchRange<PulseId>),
@@ -1345,7 +1473,7 @@ pub enum PreBinnedPatchRangeEnum {
impl PreBinnedPatchRangeEnum {
fn covering_range_ty<T>(a: T, b: T, min_bin_count: u32) -> Result<Self, Error>
where
T: Dim0Index,
T: Dim0Index + 'static,
{
let opts = T::prebin_bin_len_opts();
if min_bin_count < 1 {
@@ -1354,17 +1482,16 @@ impl PreBinnedPatchRangeEnum {
if min_bin_count > 20000 {
Err(Error::with_msg(format!("min_bin_count > 20000: {}", min_bin_count)))?;
}
let du = b - a;
let max_bin_len = du.div_n(min_bin_count);
for (i1, bl) in opts.enumerate().rev() {
if bl <= du {
let patch_len = bl.prebin_patch_len_for(i1);
let du = b.sub(&a);
let max_bin_len = du.div_n(min_bin_count as u64);
for (i1, bl) in opts.iter().enumerate().rev() {
if bl <= &du {
let patch_len = <T as Dim0Index>::prebin_patch_len_for(i1);
let bin_count = patch_len.div_v(bl);
let patch_off_1 = a.div_v(&patch_len);
let patch_off_2 = b.div_v(&patch_len.add(patch_len).sub(1));
//patch_off_2.sub(patch_off_1);
let patch_off_2 = b.div_v(&patch_len.add(&patch_len).sub_n(1));
let patch_count = patch_off_2 - patch_off_1;
let ret = T::to_pre_binned_patch_range_enum(bl, bin_count, patch_off_1, patch_count);
let ret = T::to_pre_binned_patch_range_enum(&bl, bin_count, patch_off_1, patch_count);
return Ok(ret);
}
}
@@ -1380,7 +1507,7 @@ impl PreBinnedPatchRangeEnum {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub struct BinnedRange<T>
where
T: Dim0Index,
@@ -1399,6 +1526,7 @@ where
.field("bin_len", &self.bin_len)
.field("bin_off", &self.bin_off)
.field("bin_cnt", &self.bin_cnt)
.finish()
}
}
@@ -1406,68 +1534,28 @@ impl<T> BinnedRange<T>
where
T: Dim0Index,
{
pub fn covering_range(range: NanoRange, min_bin_count: u32) -> Result<Self, Error> {
let thresholds = &BIN_THRESHOLDS;
if min_bin_count < 1 {
Err(Error::with_msg("min_bin_count < 1"))?;
}
if min_bin_count > 20000 {
Err(Error::with_msg(format!("min_bin_count > 20000: {}", min_bin_count)))?;
}
let dt = range.delta();
if dt > DAY * 200 {
Err(Error::with_msg("dt > DAY * 200"))?;
}
let bs = dt / min_bin_count as u64;
let mut i1 = thresholds.len();
loop {
if i1 == 0 {
let msg = format!("covering_range thresholds bad i {i1}");
return Err(Error::with_msg_no_trace(msg));
} else {
i1 -= 1;
let t = thresholds[i1];
if t <= bs || i1 == 0 {
let grid_spec = BinnedGridSpec { bin_t_len: t };
let bl = grid_spec.bin_t_len();
let ts1 = range.beg / bl * bl;
let ts2 = (range.end + bl - 1) / bl * bl;
let count = (ts2 - ts1) / bl;
let offset = ts1 / bl;
let ret = Self {
grid_spec,
bin_count: count,
offset,
};
break Ok(ret);
}
}
}
}
pub fn bin_count(&self) -> u64 {
self.bin_count
}
pub fn grid_spec(&self) -> &BinnedGridSpec {
&self.grid_spec
self.bin_cnt
}
pub fn get_range(&self, ix: u32) -> NanoRange {
NanoRange {
/*NanoRange {
beg: (self.offset + ix as u64) * self.grid_spec.bin_t_len,
end: (self.offset + ix as u64 + 1) * self.grid_spec.bin_t_len,
}
}*/
err::todoval()
}
pub fn full_range(&self) -> NanoRange {
NanoRange {
/*NanoRange {
beg: self.offset * self.grid_spec.bin_t_len,
end: (self.offset + self.bin_count) * self.grid_spec.bin_t_len,
}
}*/
err::todoval()
}
pub fn edges(&self) -> Vec<u64> {
/*
let mut ret = Vec::new();
let mut t = self.offset * self.grid_spec.bin_t_len;
let end = (self.offset + self.bin_count) * self.grid_spec.bin_t_len;
@@ -1475,7 +1563,8 @@ where
ret.push(t);
t += self.grid_spec.bin_t_len;
}
ret
*/
err::todoval()
}
}
@@ -1487,26 +1576,23 @@ pub enum BinnedRangeEnum {
impl BinnedRangeEnum {
fn covering_range_ty<T>(a: T, b: T, min_bin_count: u32) -> Result<Self, Error>
where
T: Dim0Index,
T: Dim0Index + 'static,
{
let opts = T::prebin_bin_len_opts();
let opts = T::binned_bin_len_opts();
if min_bin_count < 1 {
Err(Error::with_msg("min_bin_count < 1"))?;
}
if min_bin_count > 20000 {
Err(Error::with_msg(format!("min_bin_count > 20000: {}", min_bin_count)))?;
}
let du = b - a;
let max_bin_len = du.div_n(min_bin_count);
for (i1, bl) in opts.enumerate().rev() {
if bl <= du {
let patch_len = bl.prebin_patch_len_for(i1);
let bin_count = patch_len.div_v(bl);
let patch_off_1 = a.div_v(&patch_len);
let patch_off_2 = b.div_v(&patch_len.add(patch_len).sub(1));
//patch_off_2.sub(patch_off_1);
let patch_count = patch_off_2 - patch_off_1;
let ret = T::to_pre_binned_patch_range_enum(bl, bin_count, patch_off_1, patch_count);
let du = b.sub(&a);
let max_bin_len = du.div_n(min_bin_count as u64);
for (i1, bl) in opts.iter().enumerate().rev() {
if bl <= &du {
let off_1 = a.div_v(&bl);
let off_2 = b.div_v(&bl.add(&bl).sub_n(1));
let bin_cnt = off_2 - off_1;
let ret = T::to_binned_range_enum(bl, off_1, bin_cnt);
return Ok(ret);
}
}
@@ -1532,10 +1618,15 @@ mod test_binned_range {
beg: HOUR * 72,
end: HOUR * 73,
};
let range = BinnedRange::covering_range(range, 10).unwrap();
assert_eq!(range.bin_count(), 12);
assert_eq!(range.edges()[0], HOUR * 72);
assert_eq!(range.edges()[2], HOUR * 72 + MIN * 5 * 2);
let range = BinnedRangeEnum::covering_range(range.into(), 10).unwrap();
match range {
BinnedRangeEnum::Time(range) => {
assert_eq!(range.bin_count(), 12);
assert_eq!(range.edges()[0], HOUR * 72);
assert_eq!(range.edges()[2], HOUR * 72 + MIN * 5 * 2);
}
BinnedRangeEnum::Pulse(_) => panic!(),
}
}
#[test]
@@ -1544,11 +1635,16 @@ mod test_binned_range {
beg: MIN * 20 + SEC * 10,
end: HOUR * 10 + MIN * 20 + SEC * 30,
};
let range = BinnedRange::covering_range(range, 10).unwrap();
assert_eq!(range.bin_count(), 11);
assert_eq!(range.edges()[0], HOUR * 0);
assert_eq!(range.edges()[1], HOUR * 1);
assert_eq!(range.edges()[11], HOUR * 11);
let range = BinnedRangeEnum::covering_range(range.into(), 10).unwrap();
match range {
BinnedRangeEnum::Time(range) => {
assert_eq!(range.bin_count(), 11);
assert_eq!(range.edges()[0], HOUR * 0);
assert_eq!(range.edges()[1], HOUR * 1);
assert_eq!(range.edges()[11], HOUR * 11);
}
BinnedRangeEnum::Pulse(_) => panic!(),
}
}
}

View File

@@ -1,13 +1,12 @@
use super::agg_kind_from_binning_scheme;
use super::binning_scheme_append_to_url;
use super::CacheUsage;
use crate::timeunits::SEC;
use crate::AggKind;
use crate::AppendToUrl;
use crate::ByteSize;
use crate::Channel;
use crate::FromUrl;
use crate::PreBinnedPatchCoord;
use crate::PreBinnedPatchCoordEnum;
use crate::ScalarType;
use crate::Shape;
use err::Error;
@@ -16,7 +15,7 @@ use url::Url;
#[derive(Clone, Debug)]
pub struct PreBinnedQuery {
patch: PreBinnedPatchCoord,
patch: PreBinnedPatchCoordEnum,
channel: Channel,
scalar_type: ScalarType,
shape: Shape,
@@ -28,7 +27,7 @@ pub struct PreBinnedQuery {
impl PreBinnedQuery {
pub fn new(
patch: PreBinnedPatchCoord,
patch: PreBinnedPatchCoordEnum,
channel: Channel,
scalar_type: ScalarType,
shape: Shape,
@@ -55,18 +54,6 @@ impl PreBinnedQuery {
pairs.insert(j.to_string(), k.to_string());
}
let pairs = pairs;
let bin_t_len: u64 = pairs
.get("binTlen")
.ok_or_else(|| Error::with_msg("missing binTlen"))?
.parse()?;
let patch_t_len: u64 = pairs
.get("patchTlen")
.ok_or_else(|| Error::with_msg("missing patchTlen"))?
.parse()?;
let patch_ix = pairs
.get("patchIx")
.ok_or_else(|| Error::with_msg("missing patchIx"))?
.parse()?;
let scalar_type = pairs
.get("scalarType")
.ok_or_else(|| Error::with_msg("missing scalarType"))
@@ -76,7 +63,7 @@ impl PreBinnedQuery {
.ok_or_else(|| Error::with_msg("missing shape"))
.map(|x| Shape::from_url_str(&x))??;
let ret = Self {
patch: PreBinnedPatchCoord::new(bin_t_len * SEC, patch_t_len * SEC, patch_ix),
patch: PreBinnedPatchCoordEnum::from_pairs(&pairs)?,
channel: Channel::from_pairs(&pairs)?,
scalar_type,
shape,
@@ -94,7 +81,7 @@ impl PreBinnedQuery {
Ok(ret)
}
pub fn patch(&self) -> &PreBinnedPatchCoord {
pub fn patch(&self) -> &PreBinnedPatchCoordEnum {
&self.patch
}