diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 49e047f..28f4a25 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -18,6 +18,7 @@ futures-core = "0.3.14" futures-util = "0.3.14" async-stream = "0.3.0" tracing = "0.1.25" +fs2 = "0.4.3" libc = "0.2.93" hex = "0.4.3" tiny-keccak = { version = "2.0", features = ["sha3"] } diff --git a/disk/src/cache.rs b/disk/src/cache.rs index c3e3ed6..a463a98 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -121,13 +121,7 @@ impl PreBinnedQuery { pub fn from_request(req: &http::request::Parts) -> Result { let params = netpod::query_params(req.uri.query()); let ret = PreBinnedQuery { - patch: PreBinnedPatchCoord { - spec: PreBinnedPatchGridSpec { - patch_t_len: params.get("patch_t_len").ok_or(Error::with_msg("missing patch_t_len"))?.parse()?, - bin_t_len: params.get("bin_t_len").ok_or(Error::with_msg("missing bin_t_len"))?.parse()?, - }, - ix: params.get("patch_ix").ok_or(Error::with_msg("missing patch_ix"))?.parse()?, - }, + patch: PreBinnedPatchCoord::from_query_params(¶ms), agg_kind: AggKind::DimXBins1, channel: Channel { backend: params.get("channel_backend").unwrap().into(), @@ -216,19 +210,19 @@ impl PreBinnedValueStream { fn try_setup_fetch_prebinned_higher_res(&mut self) { info!("try to find a next better granularity for {:?}", self.patch_coord); - let g = self.patch_coord.spec.bin_t_len; + let g = self.patch_coord.bin_t_len(); let range = NanoRange { beg: self.patch_coord.patch_beg(), end: self.patch_coord.patch_end(), }; match PreBinnedPatchRange::covering_range(range, 2, 0) { Some(range) => { - let h = range.grid_spec.bin_t_len; + let h = range.grid_spec.bin_t_len(); assert!(g / h > 1); assert!(g / h < 20); assert!(g % h == 0); info!("FOUND NEXT GRAN g {} h {} ratio {} mod {} {:?}", g, h, g/h, g%h, range); - let bin_size = range.grid_spec.bin_t_len; + let bin_size = range.grid_spec.bin_t_len(); let channel = self.channel.clone(); let agg_kind = self.agg_kind.clone(); let node_config = self.node_config.clone(); @@ -463,8 +457,9 @@ pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, c let mut hash = tiny_keccak::Sha3::v256(); hash.update(channel.backend.as_bytes()); hash.update(channel.name.as_bytes()); - hash.update(&patch_coord.range.beg.to_le_bytes()); - hash.update(&patch_coord.range.end.to_le_bytes()); + hash.update(&patch_coord.patch_beg().to_le_bytes()); + hash.update(&patch_coord.patch_end().to_le_bytes()); + hash.update(&patch_coord.bin_t_len().to_le_bytes()); let mut out = [0; 32]; hash.finalize(&mut out); let mut a = [out[0], out[1], out[2], out[3]]; diff --git a/netpod/Cargo.toml b/netpod/Cargo.toml index e542c27..66fbf88 100644 --- a/netpod/Cargo.toml +++ b/netpod/Cargo.toml @@ -11,4 +11,5 @@ bytes = "1.0.1" chrono = { version = "0.4.19", features = ["serde"] } futures-core = "0.3.12" tracing = "0.1.25" +url = "2.2" err = { path = "../err" } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index e28649a..0e0eee4 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -5,6 +5,8 @@ use err::Error; use std::path::PathBuf; use chrono::{DateTime, Utc, TimeZone}; use std::sync::Arc; +use std::collections::BTreeMap; +use timeunits::*; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -212,7 +214,7 @@ impl BinSpecDimT { let dt = ts2 - ts1; assert!(dt <= DAY * 14); let bs = dt / count; - let thresholds = [ + let BIN_THRESHOLDS = [ 2, 10, 100, 1000, 10_000, 100_000, MU, MU * 10, MU * 100, @@ -225,10 +227,10 @@ impl BinSpecDimT { ]; let mut i1 = 0; let bs = loop { - if i1 >= thresholds.len() { - break *thresholds.last().unwrap(); + if i1 >= BIN_THRESHOLDS.len() { + break *BIN_THRESHOLDS.last().unwrap(); } - let t = thresholds[i1]; + let t = BIN_THRESHOLDS[i1]; if bs <= t { break t; } @@ -259,13 +261,85 @@ impl BinSpecDimT { #[derive(Clone, Debug)] pub struct PreBinnedPatchGridSpec { - pub bin_t_len: u64, - pub patch_t_len: u64, + bin_t_len: u64, } impl PreBinnedPatchGridSpec { + + pub fn new(bin_t_len: u64) -> Self { + let mut ok = false; + for &j in PATCH_T_LEN_OPTIONS.iter() { + if bin_t_len == j { + ok = true; + break; + } + } + if !ok { + panic!("invalid bin_t_len for PreBinnedPatchGridSpec {}", bin_t_len); + } + Self { + bin_t_len, + } + } + + pub fn from_query_params(params: &BTreeMap) -> Self { + let bin_t_len = params.get("bin_t_len").unwrap().parse().unwrap(); + if !Self::is_valid_bin_t_len(bin_t_len) { + panic!("invalid bin_t_len {}", bin_t_len); + } + Self { + bin_t_len: bin_t_len, + } + } + + pub fn bin_t_len(&self) -> u64 { + self.bin_t_len + } + + pub fn is_valid_bin_t_len(bin_t_len: u64) -> bool { + for &j in PATCH_T_LEN_OPTIONS.iter() { + if bin_t_len == j { + return true; + } + } + return false; + } + + pub fn patch_t_len(&self) -> u64 { + if self.bin_t_len == PATCH_T_LEN_OPTIONS[0] { + PATCH_T_LEN_OPTIONS[1] + } + else if self.bin_t_len == PATCH_T_LEN_OPTIONS[1] { + PATCH_T_LEN_OPTIONS[3] + } + else if self.bin_t_len == PATCH_T_LEN_OPTIONS[2] { + PATCH_T_LEN_OPTIONS[4] + } + else if self.bin_t_len == PATCH_T_LEN_OPTIONS[3] { + PATCH_T_LEN_OPTIONS[5] + } + else if self.bin_t_len == PATCH_T_LEN_OPTIONS[4] { + PATCH_T_LEN_OPTIONS[5] * 64 + } + else if self.bin_t_len == PATCH_T_LEN_OPTIONS[5] { + PATCH_T_LEN_OPTIONS[5] * 1024 + } + else { + panic!() + } + } + } +const PATCH_T_LEN_OPTIONS: [u64; 6] = [ + SEC * 10, + MIN * 10, + HOUR, + HOUR * 4, + DAY, + DAY * 4, +]; + #[derive(Clone, Debug)] pub struct PreBinnedPatchRange { @@ -277,30 +351,21 @@ pub struct PreBinnedPatchRange { impl PreBinnedPatchRange { pub fn covering_range(range: NanoRange, min_bin_count: u64, finer: u64) -> Option { - use timeunits::*; assert!(min_bin_count >= 1); assert!(min_bin_count <= 2000); let dt = range.delta(); assert!(dt <= DAY * 14); let bs = dt / min_bin_count; //info!("BASIC bs {}", bs); - let thresholds = [ - SEC * 10, - MIN * 10, - HOUR, - HOUR * 4, - DAY, - DAY * 4, - ]; let mut found_count = 0; - let mut i1 = thresholds.len(); + let mut i1 = PATCH_T_LEN_OPTIONS.len(); loop { if i1 <= 0 { break None; } else { i1 -= 1; - let t = thresholds[i1]; + let t = PATCH_T_LEN_OPTIONS[i1]; //info!("look at threshold {} bs {}", t, bs); if t <= bs { found_count += 1; @@ -309,17 +374,16 @@ impl PreBinnedPatchRange { let ts1 = range.beg / bs * bs; let ts2 = (range.end + bs - 1) / bs * bs; let count = range.delta() / bs; - let patch_t_len = if i1 >= thresholds.len() - 1 { + let patch_t_len = if i1 >= PATCH_T_LEN_OPTIONS.len() - 1 { bs * 8 } else { - thresholds[i1 + 1] * 8 + PATCH_T_LEN_OPTIONS[i1 + 1] * 8 }; let offset = ts1 / bs; break Some(Self { grid_spec: PreBinnedPatchGridSpec { bin_t_len: bs, - patch_t_len, }, count, offset, @@ -339,8 +403,8 @@ impl PreBinnedPatchRange { #[derive(Clone, Debug)] pub struct PreBinnedPatchCoord { - pub spec: PreBinnedPatchGridSpec, - pub ix: u64, + spec: PreBinnedPatchGridSpec, + ix: u64, } impl PreBinnedPatchCoord { @@ -350,15 +414,35 @@ impl PreBinnedPatchCoord { } pub fn patch_t_len(&self) -> u64 { - self.spec.patch_t_len + self.spec.patch_t_len() } pub fn patch_beg(&self) -> u64 { - self.spec.patch_t_len * self.ix + self.spec.patch_t_len() * self.ix } pub fn patch_end(&self) -> u64 { - self.spec.patch_t_len * (self.ix + 1) + self.spec.patch_t_len() * (self.ix + 1) + } + + pub fn spec(&self) -> &PreBinnedPatchGridSpec { + &self.spec + } + + pub fn ix(&self) -> u64 { + self.ix + } + + pub fn to_url_params_strings(&self) -> String { + format!("patch_t_len={}&bin_t_len={}&patch_ix={}", self.spec.patch_t_len(), self.spec.bin_t_len(), self.ix()) + } + + pub fn from_query_params(params: &BTreeMap) -> Self { + let patch_ix = params.get("patch_ix").unwrap().parse().unwrap(); + Self { + spec: PreBinnedPatchGridSpec::from_query_params(params), + ix: patch_ix, + } } }