WIP
This commit is contained in:
@@ -18,6 +18,7 @@ futures-core = "0.3.14"
|
|||||||
futures-util = "0.3.14"
|
futures-util = "0.3.14"
|
||||||
async-stream = "0.3.0"
|
async-stream = "0.3.0"
|
||||||
tracing = "0.1.25"
|
tracing = "0.1.25"
|
||||||
|
fs2 = "0.4.3"
|
||||||
libc = "0.2.93"
|
libc = "0.2.93"
|
||||||
hex = "0.4.3"
|
hex = "0.4.3"
|
||||||
tiny-keccak = { version = "2.0", features = ["sha3"] }
|
tiny-keccak = { version = "2.0", features = ["sha3"] }
|
||||||
|
|||||||
+7
-12
@@ -121,13 +121,7 @@ impl PreBinnedQuery {
|
|||||||
pub fn from_request(req: &http::request::Parts) -> Result<Self, Error> {
|
pub fn from_request(req: &http::request::Parts) -> Result<Self, Error> {
|
||||||
let params = netpod::query_params(req.uri.query());
|
let params = netpod::query_params(req.uri.query());
|
||||||
let ret = PreBinnedQuery {
|
let ret = PreBinnedQuery {
|
||||||
patch: PreBinnedPatchCoord {
|
patch: PreBinnedPatchCoord::from_query_params(¶ms),
|
||||||
spec: PreBinnedPatchGridSpec {
|
|
||||||
patch_t_len: params.get("patch_t_len").ok_or(Error::with_msg("missing patch_t_len"))?.parse()?,
|
|
||||||
bin_t_len: params.get("bin_t_len").ok_or(Error::with_msg("missing bin_t_len"))?.parse()?,
|
|
||||||
},
|
|
||||||
ix: params.get("patch_ix").ok_or(Error::with_msg("missing patch_ix"))?.parse()?,
|
|
||||||
},
|
|
||||||
agg_kind: AggKind::DimXBins1,
|
agg_kind: AggKind::DimXBins1,
|
||||||
channel: Channel {
|
channel: Channel {
|
||||||
backend: params.get("channel_backend").unwrap().into(),
|
backend: params.get("channel_backend").unwrap().into(),
|
||||||
@@ -216,19 +210,19 @@ impl PreBinnedValueStream {
|
|||||||
|
|
||||||
fn try_setup_fetch_prebinned_higher_res(&mut self) {
|
fn try_setup_fetch_prebinned_higher_res(&mut self) {
|
||||||
info!("try to find a next better granularity for {:?}", self.patch_coord);
|
info!("try to find a next better granularity for {:?}", self.patch_coord);
|
||||||
let g = self.patch_coord.spec.bin_t_len;
|
let g = self.patch_coord.bin_t_len();
|
||||||
let range = NanoRange {
|
let range = NanoRange {
|
||||||
beg: self.patch_coord.patch_beg(),
|
beg: self.patch_coord.patch_beg(),
|
||||||
end: self.patch_coord.patch_end(),
|
end: self.patch_coord.patch_end(),
|
||||||
};
|
};
|
||||||
match PreBinnedPatchRange::covering_range(range, 2, 0) {
|
match PreBinnedPatchRange::covering_range(range, 2, 0) {
|
||||||
Some(range) => {
|
Some(range) => {
|
||||||
let h = range.grid_spec.bin_t_len;
|
let h = range.grid_spec.bin_t_len();
|
||||||
assert!(g / h > 1);
|
assert!(g / h > 1);
|
||||||
assert!(g / h < 20);
|
assert!(g / h < 20);
|
||||||
assert!(g % h == 0);
|
assert!(g % h == 0);
|
||||||
info!("FOUND NEXT GRAN g {} h {} ratio {} mod {} {:?}", g, h, g/h, g%h, range);
|
info!("FOUND NEXT GRAN g {} h {} ratio {} mod {} {:?}", g, h, g/h, g%h, range);
|
||||||
let bin_size = range.grid_spec.bin_t_len;
|
let bin_size = range.grid_spec.bin_t_len();
|
||||||
let channel = self.channel.clone();
|
let channel = self.channel.clone();
|
||||||
let agg_kind = self.agg_kind.clone();
|
let agg_kind = self.agg_kind.clone();
|
||||||
let node_config = self.node_config.clone();
|
let node_config = self.node_config.clone();
|
||||||
@@ -463,8 +457,9 @@ pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, c
|
|||||||
let mut hash = tiny_keccak::Sha3::v256();
|
let mut hash = tiny_keccak::Sha3::v256();
|
||||||
hash.update(channel.backend.as_bytes());
|
hash.update(channel.backend.as_bytes());
|
||||||
hash.update(channel.name.as_bytes());
|
hash.update(channel.name.as_bytes());
|
||||||
hash.update(&patch_coord.range.beg.to_le_bytes());
|
hash.update(&patch_coord.patch_beg().to_le_bytes());
|
||||||
hash.update(&patch_coord.range.end.to_le_bytes());
|
hash.update(&patch_coord.patch_end().to_le_bytes());
|
||||||
|
hash.update(&patch_coord.bin_t_len().to_le_bytes());
|
||||||
let mut out = [0; 32];
|
let mut out = [0; 32];
|
||||||
hash.finalize(&mut out);
|
hash.finalize(&mut out);
|
||||||
let mut a = [out[0], out[1], out[2], out[3]];
|
let mut a = [out[0], out[1], out[2], out[3]];
|
||||||
|
|||||||
@@ -11,4 +11,5 @@ bytes = "1.0.1"
|
|||||||
chrono = { version = "0.4.19", features = ["serde"] }
|
chrono = { version = "0.4.19", features = ["serde"] }
|
||||||
futures-core = "0.3.12"
|
futures-core = "0.3.12"
|
||||||
tracing = "0.1.25"
|
tracing = "0.1.25"
|
||||||
|
url = "2.2"
|
||||||
err = { path = "../err" }
|
err = { path = "../err" }
|
||||||
|
|||||||
+109
-25
@@ -5,6 +5,8 @@ use err::Error;
|
|||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use chrono::{DateTime, Utc, TimeZone};
|
use chrono::{DateTime, Utc, TimeZone};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
use timeunits::*;
|
||||||
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
@@ -212,7 +214,7 @@ impl BinSpecDimT {
|
|||||||
let dt = ts2 - ts1;
|
let dt = ts2 - ts1;
|
||||||
assert!(dt <= DAY * 14);
|
assert!(dt <= DAY * 14);
|
||||||
let bs = dt / count;
|
let bs = dt / count;
|
||||||
let thresholds = [
|
let BIN_THRESHOLDS = [
|
||||||
2, 10, 100,
|
2, 10, 100,
|
||||||
1000, 10_000, 100_000,
|
1000, 10_000, 100_000,
|
||||||
MU, MU * 10, MU * 100,
|
MU, MU * 10, MU * 100,
|
||||||
@@ -225,10 +227,10 @@ impl BinSpecDimT {
|
|||||||
];
|
];
|
||||||
let mut i1 = 0;
|
let mut i1 = 0;
|
||||||
let bs = loop {
|
let bs = loop {
|
||||||
if i1 >= thresholds.len() {
|
if i1 >= BIN_THRESHOLDS.len() {
|
||||||
break *thresholds.last().unwrap();
|
break *BIN_THRESHOLDS.last().unwrap();
|
||||||
}
|
}
|
||||||
let t = thresholds[i1];
|
let t = BIN_THRESHOLDS[i1];
|
||||||
if bs <= t {
|
if bs <= t {
|
||||||
break t;
|
break t;
|
||||||
}
|
}
|
||||||
@@ -259,13 +261,85 @@ impl BinSpecDimT {
|
|||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct PreBinnedPatchGridSpec {
|
pub struct PreBinnedPatchGridSpec {
|
||||||
pub bin_t_len: u64,
|
bin_t_len: u64,
|
||||||
pub patch_t_len: u64,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PreBinnedPatchGridSpec {
|
impl PreBinnedPatchGridSpec {
|
||||||
|
|
||||||
|
pub fn new(bin_t_len: u64) -> Self {
|
||||||
|
let mut ok = false;
|
||||||
|
for &j in PATCH_T_LEN_OPTIONS.iter() {
|
||||||
|
if bin_t_len == j {
|
||||||
|
ok = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
panic!("invalid bin_t_len for PreBinnedPatchGridSpec {}", bin_t_len);
|
||||||
|
}
|
||||||
|
Self {
|
||||||
|
bin_t_len,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_query_params(params: &BTreeMap<String, String>) -> Self {
|
||||||
|
let bin_t_len = params.get("bin_t_len").unwrap().parse().unwrap();
|
||||||
|
if !Self::is_valid_bin_t_len(bin_t_len) {
|
||||||
|
panic!("invalid bin_t_len {}", bin_t_len);
|
||||||
|
}
|
||||||
|
Self {
|
||||||
|
bin_t_len: bin_t_len,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn bin_t_len(&self) -> u64 {
|
||||||
|
self.bin_t_len
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_valid_bin_t_len(bin_t_len: u64) -> bool {
|
||||||
|
for &j in PATCH_T_LEN_OPTIONS.iter() {
|
||||||
|
if bin_t_len == j {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn patch_t_len(&self) -> u64 {
|
||||||
|
if self.bin_t_len == PATCH_T_LEN_OPTIONS[0] {
|
||||||
|
PATCH_T_LEN_OPTIONS[1]
|
||||||
|
}
|
||||||
|
else if self.bin_t_len == PATCH_T_LEN_OPTIONS[1] {
|
||||||
|
PATCH_T_LEN_OPTIONS[3]
|
||||||
|
}
|
||||||
|
else if self.bin_t_len == PATCH_T_LEN_OPTIONS[2] {
|
||||||
|
PATCH_T_LEN_OPTIONS[4]
|
||||||
|
}
|
||||||
|
else if self.bin_t_len == PATCH_T_LEN_OPTIONS[3] {
|
||||||
|
PATCH_T_LEN_OPTIONS[5]
|
||||||
|
}
|
||||||
|
else if self.bin_t_len == PATCH_T_LEN_OPTIONS[4] {
|
||||||
|
PATCH_T_LEN_OPTIONS[5] * 64
|
||||||
|
}
|
||||||
|
else if self.bin_t_len == PATCH_T_LEN_OPTIONS[5] {
|
||||||
|
PATCH_T_LEN_OPTIONS[5] * 1024
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
panic!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const PATCH_T_LEN_OPTIONS: [u64; 6] = [
|
||||||
|
SEC * 10,
|
||||||
|
MIN * 10,
|
||||||
|
HOUR,
|
||||||
|
HOUR * 4,
|
||||||
|
DAY,
|
||||||
|
DAY * 4,
|
||||||
|
];
|
||||||
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct PreBinnedPatchRange {
|
pub struct PreBinnedPatchRange {
|
||||||
@@ -277,30 +351,21 @@ pub struct PreBinnedPatchRange {
|
|||||||
impl PreBinnedPatchRange {
|
impl PreBinnedPatchRange {
|
||||||
|
|
||||||
pub fn covering_range(range: NanoRange, min_bin_count: u64, finer: u64) -> Option<Self> {
|
pub fn covering_range(range: NanoRange, min_bin_count: u64, finer: u64) -> Option<Self> {
|
||||||
use timeunits::*;
|
|
||||||
assert!(min_bin_count >= 1);
|
assert!(min_bin_count >= 1);
|
||||||
assert!(min_bin_count <= 2000);
|
assert!(min_bin_count <= 2000);
|
||||||
let dt = range.delta();
|
let dt = range.delta();
|
||||||
assert!(dt <= DAY * 14);
|
assert!(dt <= DAY * 14);
|
||||||
let bs = dt / min_bin_count;
|
let bs = dt / min_bin_count;
|
||||||
//info!("BASIC bs {}", bs);
|
//info!("BASIC bs {}", bs);
|
||||||
let thresholds = [
|
|
||||||
SEC * 10,
|
|
||||||
MIN * 10,
|
|
||||||
HOUR,
|
|
||||||
HOUR * 4,
|
|
||||||
DAY,
|
|
||||||
DAY * 4,
|
|
||||||
];
|
|
||||||
let mut found_count = 0;
|
let mut found_count = 0;
|
||||||
let mut i1 = thresholds.len();
|
let mut i1 = PATCH_T_LEN_OPTIONS.len();
|
||||||
loop {
|
loop {
|
||||||
if i1 <= 0 {
|
if i1 <= 0 {
|
||||||
break None;
|
break None;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
i1 -= 1;
|
i1 -= 1;
|
||||||
let t = thresholds[i1];
|
let t = PATCH_T_LEN_OPTIONS[i1];
|
||||||
//info!("look at threshold {} bs {}", t, bs);
|
//info!("look at threshold {} bs {}", t, bs);
|
||||||
if t <= bs {
|
if t <= bs {
|
||||||
found_count += 1;
|
found_count += 1;
|
||||||
@@ -309,17 +374,16 @@ impl PreBinnedPatchRange {
|
|||||||
let ts1 = range.beg / bs * bs;
|
let ts1 = range.beg / bs * bs;
|
||||||
let ts2 = (range.end + bs - 1) / bs * bs;
|
let ts2 = (range.end + bs - 1) / bs * bs;
|
||||||
let count = range.delta() / bs;
|
let count = range.delta() / bs;
|
||||||
let patch_t_len = if i1 >= thresholds.len() - 1 {
|
let patch_t_len = if i1 >= PATCH_T_LEN_OPTIONS.len() - 1 {
|
||||||
bs * 8
|
bs * 8
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
thresholds[i1 + 1] * 8
|
PATCH_T_LEN_OPTIONS[i1 + 1] * 8
|
||||||
};
|
};
|
||||||
let offset = ts1 / bs;
|
let offset = ts1 / bs;
|
||||||
break Some(Self {
|
break Some(Self {
|
||||||
grid_spec: PreBinnedPatchGridSpec {
|
grid_spec: PreBinnedPatchGridSpec {
|
||||||
bin_t_len: bs,
|
bin_t_len: bs,
|
||||||
patch_t_len,
|
|
||||||
},
|
},
|
||||||
count,
|
count,
|
||||||
offset,
|
offset,
|
||||||
@@ -339,8 +403,8 @@ impl PreBinnedPatchRange {
|
|||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct PreBinnedPatchCoord {
|
pub struct PreBinnedPatchCoord {
|
||||||
pub spec: PreBinnedPatchGridSpec,
|
spec: PreBinnedPatchGridSpec,
|
||||||
pub ix: u64,
|
ix: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PreBinnedPatchCoord {
|
impl PreBinnedPatchCoord {
|
||||||
@@ -350,15 +414,35 @@ impl PreBinnedPatchCoord {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn patch_t_len(&self) -> u64 {
|
pub fn patch_t_len(&self) -> u64 {
|
||||||
self.spec.patch_t_len
|
self.spec.patch_t_len()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn patch_beg(&self) -> u64 {
|
pub fn patch_beg(&self) -> u64 {
|
||||||
self.spec.patch_t_len * self.ix
|
self.spec.patch_t_len() * self.ix
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn patch_end(&self) -> u64 {
|
pub fn patch_end(&self) -> u64 {
|
||||||
self.spec.patch_t_len * (self.ix + 1)
|
self.spec.patch_t_len() * (self.ix + 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn spec(&self) -> &PreBinnedPatchGridSpec {
|
||||||
|
&self.spec
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn ix(&self) -> u64 {
|
||||||
|
self.ix
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn to_url_params_strings(&self) -> String {
|
||||||
|
format!("patch_t_len={}&bin_t_len={}&patch_ix={}", self.spec.patch_t_len(), self.spec.bin_t_len(), self.ix())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_query_params(params: &BTreeMap<String, String>) -> Self {
|
||||||
|
let patch_ix = params.get("patch_ix").unwrap().parse().unwrap();
|
||||||
|
Self {
|
||||||
|
spec: PreBinnedPatchGridSpec::from_query_params(params),
|
||||||
|
ix: patch_ix,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user