Find the next higher res binning
This commit is contained in:
@@ -8,7 +8,7 @@ use futures_core::Stream;
|
||||
use futures_util::{StreamExt, FutureExt, pin_mut, TryFutureExt};
|
||||
use bytes::{Bytes, BytesMut, BufMut};
|
||||
use chrono::{DateTime, Utc};
|
||||
use netpod::{Node, Cluster, AggKind, NanoRange, ToNanos, PreBinnedPatchRange, PreBinnedPatchIterator, PreBinnedPatchCoord, Channel, NodeConfig};
|
||||
use netpod::{Node, Cluster, AggKind, NanoRange, ToNanos, PreBinnedPatchRange, PreBinnedPatchIterator, PreBinnedPatchCoord, Channel, NodeConfig, PreBinnedPatchGridSpec};
|
||||
use crate::agg::MinMaxAvgScalarBinBatch;
|
||||
use http::uri::Scheme;
|
||||
use tiny_keccak::Hasher;
|
||||
@@ -54,7 +54,7 @@ pub fn binned_bytes_for_http(node_config: Arc<NodeConfig>, query: &Query) -> Res
|
||||
|
||||
// TODO
|
||||
// Translate the Query TimeRange + AggKind into an iterator over the pre-binned patches.
|
||||
let grid = PreBinnedPatchRange::covering_range(query.range.clone(), query.count);
|
||||
let grid = PreBinnedPatchRange::covering_range(query.range.clone(), query.count, 0);
|
||||
match grid {
|
||||
Some(spec) => {
|
||||
info!("GOT PreBinnedPatchGridSpec: {:?}", spec);
|
||||
@@ -170,7 +170,6 @@ impl Stream for PreBinnedValueByteStream {
|
||||
type Item = Result<Bytes, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
error!("PreBinnedValueByteStream poll_next");
|
||||
use Poll::*;
|
||||
match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(k))) => {
|
||||
@@ -196,6 +195,7 @@ pub struct PreBinnedValueStream {
|
||||
agg_kind: AggKind,
|
||||
node_config: Arc<NodeConfig>,
|
||||
open_check_local_file: Option<Pin<Box<dyn Future<Output=Result<tokio::fs::File, std::io::Error>> + Send>>>,
|
||||
fut2: Option<()>,
|
||||
}
|
||||
|
||||
impl PreBinnedValueStream {
|
||||
@@ -209,6 +209,24 @@ impl PreBinnedValueStream {
|
||||
agg_kind,
|
||||
node_config,
|
||||
open_check_local_file: None,
|
||||
fut2: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn try_setup_fetch_prebinned_higher_res(&mut self) {
|
||||
info!("try to find a next better granularity for {:?}", self.patch_coord);
|
||||
let g = self.patch_coord.range.end - self.patch_coord.range.beg;
|
||||
match PreBinnedPatchRange::covering_range(self.patch_coord.range.clone(), 2, 0) {
|
||||
Some(range) => {
|
||||
let h = range.grid_spec.bin_t_len;
|
||||
assert!(g / h > 1);
|
||||
assert!(g / h < 20);
|
||||
assert!(g % h == 0);
|
||||
info!("FOUND NEXT GRAN g {} h {} ratio {} mod {} {:?}", g, h, g/h, g%h, range);
|
||||
}
|
||||
None => {
|
||||
info!("NO BETTER GRAN FOUND FOR g {}", g);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -221,39 +239,42 @@ impl Stream for PreBinnedValueStream {
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
'outer: loop {
|
||||
break match self.open_check_local_file.as_mut() {
|
||||
None => {
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
let mut opts = std::fs::OpenOptions::new();
|
||||
opts.read(true);
|
||||
let fut = async {
|
||||
tokio::fs::OpenOptions::from(opts)
|
||||
.open("/DOESNOTEXIST").await
|
||||
};
|
||||
self.open_check_local_file = Some(Box::pin(fut));
|
||||
continue 'outer;
|
||||
}
|
||||
Some(fut) => {
|
||||
match fut.poll_unpin(cx) {
|
||||
Ready(Ok(file)) => {
|
||||
todo!("IMPLEMENT READ FROM LOCAL CACHE");
|
||||
Pending
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
match e.kind() {
|
||||
std::io::ErrorKind::NotFound => {
|
||||
warn!("TODO LOCAL CACHE FILE NOT FOUND");
|
||||
}
|
||||
_ => {
|
||||
error!("File I/O error: {:?}", e);
|
||||
}
|
||||
}
|
||||
Ready(Some(Err(e.into())))
|
||||
}
|
||||
Pending => Pending,
|
||||
break if let Some(fut) = self.fut2.as_mut() {
|
||||
todo!()
|
||||
}
|
||||
else if let Some(fut) = self.open_check_local_file.as_mut() {
|
||||
match fut.poll_unpin(cx) {
|
||||
Ready(Ok(file)) => {
|
||||
todo!("IMPLEMENT READ FROM LOCAL CACHE");
|
||||
Pending
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
match e.kind() {
|
||||
std::io::ErrorKind::NotFound => {
|
||||
warn!("TODO LOCAL CACHE FILE NOT FOUND");
|
||||
self.try_setup_fetch_prebinned_higher_res();
|
||||
continue 'outer;
|
||||
}
|
||||
_ => {
|
||||
error!("File I/O error: {:?}", e);
|
||||
Ready(Some(Err(e.into())))
|
||||
}
|
||||
}
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
}
|
||||
else {
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
let mut opts = std::fs::OpenOptions::new();
|
||||
opts.read(true);
|
||||
let fut = async {
|
||||
tokio::fs::OpenOptions::from(opts)
|
||||
.open("/DOESNOTEXIST").await
|
||||
};
|
||||
self.open_check_local_file = Some(Box::pin(fut));
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -276,7 +276,7 @@ pub struct PreBinnedPatchRange {
|
||||
|
||||
impl PreBinnedPatchRange {
|
||||
|
||||
pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Option<Self> {
|
||||
pub fn covering_range(range: NanoRange, min_bin_count: u64, finer: u64) -> Option<Self> {
|
||||
use timeunits::*;
|
||||
assert!(min_bin_count >= 1);
|
||||
assert!(min_bin_count <= 2000);
|
||||
@@ -292,6 +292,7 @@ impl PreBinnedPatchRange {
|
||||
DAY,
|
||||
DAY * 4,
|
||||
];
|
||||
let mut found_count = 0;
|
||||
let mut i1 = thresholds.len();
|
||||
loop {
|
||||
if i1 <= 0 {
|
||||
@@ -302,25 +303,32 @@ impl PreBinnedPatchRange {
|
||||
let t = thresholds[i1];
|
||||
//info!("look at threshold {} bs {}", t, bs);
|
||||
if t <= bs {
|
||||
let bs = t;
|
||||
let ts1 = range.beg / bs * bs;
|
||||
let ts2 = (range.end + bs - 1) / bs * bs;
|
||||
let count = range.delta() / bs;
|
||||
let patch_t_len = if i1 >= thresholds.len() - 1 {
|
||||
bs * 8
|
||||
found_count += 1;
|
||||
if found_count > finer {
|
||||
let bs = t;
|
||||
let ts1 = range.beg / bs * bs;
|
||||
let ts2 = (range.end + bs - 1) / bs * bs;
|
||||
let count = range.delta() / bs;
|
||||
let patch_t_len = if i1 >= thresholds.len() - 1 {
|
||||
bs * 8
|
||||
}
|
||||
else {
|
||||
thresholds[i1 + 1] * 8
|
||||
};
|
||||
let offset = ts1 / bs;
|
||||
break Some(Self {
|
||||
grid_spec: PreBinnedPatchGridSpec {
|
||||
bin_t_len: bs,
|
||||
patch_t_len,
|
||||
},
|
||||
count,
|
||||
offset,
|
||||
});
|
||||
}
|
||||
else {
|
||||
thresholds[i1 + 1] * 8
|
||||
};
|
||||
let offset = ts1 / bs;
|
||||
break Some(Self {
|
||||
grid_spec: PreBinnedPatchGridSpec {
|
||||
bin_t_len: bs,
|
||||
patch_t_len,
|
||||
},
|
||||
count,
|
||||
offset,
|
||||
});
|
||||
}
|
||||
}
|
||||
else {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user