From 12fa9c20477dc6e3b849e68b24a03ef0a980e2c5 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 14 Apr 2021 18:46:25 +0200 Subject: [PATCH] WIP --- disk/Cargo.toml | 1 + disk/src/cache.rs | 53 +++++++++++++++++++++++++++++++++--------- netpod/src/lib.rs | 59 +++++++++++++++++++++++++++++------------------ 3 files changed, 79 insertions(+), 34 deletions(-) diff --git a/disk/Cargo.toml b/disk/Cargo.toml index ad6c97b..49e047f 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -18,6 +18,7 @@ futures-core = "0.3.14" futures-util = "0.3.14" async-stream = "0.3.0" tracing = "0.1.25" +libc = "0.2.93" hex = "0.4.3" tiny-keccak = { version = "2.0", features = ["sha3"] } err = { path = "../err" } diff --git a/disk/src/cache.rs b/disk/src/cache.rs index fcb8752..bc75749 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,19 +1,20 @@ #[allow(unused_imports)] use tracing::{error, warn, info, debug, trace}; use err::Error; -use std::future::ready; +use std::future::{ready, Future}; use std::pin::Pin; use std::task::{Context, Poll}; use futures_core::Stream; -use futures_util::{StreamExt, FutureExt, pin_mut}; +use futures_util::{StreamExt, FutureExt, pin_mut, TryFutureExt}; use bytes::{Bytes, BytesMut, BufMut}; use chrono::{DateTime, Utc}; -use netpod::{Node, Cluster, AggKind, NanoRange, ToNanos, PreBinnedPatchGridSpec, PreBinnedPatchIterator, PreBinnedPatchCoord, Channel, NodeConfig}; +use netpod::{Node, Cluster, AggKind, NanoRange, ToNanos, PreBinnedPatchRange, PreBinnedPatchIterator, PreBinnedPatchCoord, Channel, NodeConfig}; use crate::agg::MinMaxAvgScalarBinBatch; use http::uri::Scheme; use tiny_keccak::Hasher; use serde::{Serialize, Deserialize}; use std::sync::Arc; +use tokio::fs::OpenOptions; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Query { @@ -53,7 +54,7 @@ pub fn binned_bytes_for_http(node_config: Arc, query: &Query) -> Res // TODO // Translate the Query TimeRange + AggKind into an iterator over the pre-binned patches. - let grid = PreBinnedPatchGridSpec::over_range(query.range.clone(), query.count); + let grid = PreBinnedPatchRange::covering_range(query.range.clone(), query.count); match grid { Some(spec) => { info!("GOT PreBinnedPatchGridSpec: {:?}", spec); @@ -194,6 +195,7 @@ pub struct PreBinnedValueStream { channel: Channel, agg_kind: AggKind, node_config: Arc, + open_check_local_file: Option> + Send>>>, } impl PreBinnedValueStream { @@ -206,6 +208,7 @@ impl PreBinnedValueStream { channel, agg_kind, node_config, + open_check_local_file: None, } } @@ -217,13 +220,41 @@ impl Stream for PreBinnedValueStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - // TODO provide the data from a local cached file, the on-the-fly binning of higher-res - // pre-binned patches which we can get again via http, or if there is no higher resolution - // then from raw events, or a combination of all those especially if there is not yet - // a pre-binned patch, and we have to stitch from higher-res-pre-bin plus extend with - // on-the-fly binning of fresh data. - error!("TODO provide the pre-binned data"); - Ready(None) + 'outer: loop { + break match self.open_check_local_file.as_mut() { + None => { + use std::os::unix::fs::OpenOptionsExt; + let mut opts = std::fs::OpenOptions::new(); + opts.read(true); + let fut = async { + tokio::fs::OpenOptions::from(opts) + .open("/DOESNOTEXIST").await + }; + self.open_check_local_file = Some(Box::pin(fut)); + continue 'outer; + } + Some(fut) => { + match fut.poll_unpin(cx) { + Ready(Ok(file)) => { + todo!("IMPLEMENT READ FROM LOCAL CACHE"); + Pending + } + Ready(Err(e)) => { + match e.kind() { + std::io::ErrorKind::NotFound => { + warn!("TODO LOCAL CACHE FILE NOT FOUND"); + } + _ => { + error!("File I/O error: {:?}", e); + } + } + Ready(Some(Err(e.into()))) + } + Pending => Pending, + } + } + } + } } } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 8602914..a74b399 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -259,21 +259,31 @@ impl BinSpecDimT { #[derive(Clone, Debug)] pub struct PreBinnedPatchGridSpec { - pub range: NanoRange, - pub bs: u64, - pub count: u64, + pub bin_t_len: u64, + pub patch_t_len: u64, } impl PreBinnedPatchGridSpec { +} - pub fn over_range(range: NanoRange, count: u64) -> Option { + +#[derive(Clone, Debug)] +pub struct PreBinnedPatchRange { + pub grid_spec: PreBinnedPatchGridSpec, + pub offset: u64, + pub count: u64, +} + +impl PreBinnedPatchRange { + + pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Option { use timeunits::*; - assert!(count >= 1); - assert!(count <= 2000); + assert!(min_bin_count >= 1); + assert!(min_bin_count <= 2000); let dt = range.delta(); assert!(dt <= DAY * 14); - let bs = dt / count; - info!("BASIC bs {}", bs); + let bs = dt / min_bin_count; + //info!("BASIC bs {}", bs); let thresholds = [ SEC * 10, MIN * 10, @@ -295,18 +305,21 @@ impl PreBinnedPatchGridSpec { let bs = t; let ts1 = range.beg / bs * bs; let ts2 = (range.end + bs - 1) / bs * bs; - if (ts2 - ts1) % bs != 0 { - panic!(); - } - let range = NanoRange { - beg: ts1, - end: ts2, - }; let count = range.delta() / bs; + let patch_t_len = if i1 >= thresholds.len() - 1 { + bs * 8 + } + else { + thresholds[i1 + 1] * 8 + }; + let offset = ts1 / bs; break Some(Self { - range, - bs, + grid_spec: PreBinnedPatchGridSpec { + bin_t_len: bs, + patch_t_len, + }, count, + offset, }); } } @@ -330,16 +343,16 @@ impl PreBinnedPatchCoord { } pub struct PreBinnedPatchIterator { - spec: PreBinnedPatchGridSpec, + range: PreBinnedPatchRange, agg_kind: AggKind, ix: u64, } impl PreBinnedPatchIterator { - pub fn from_range(spec: PreBinnedPatchGridSpec) -> Self { + pub fn from_range(range: PreBinnedPatchRange) -> Self { Self { - spec, + range, agg_kind: AggKind::DimXBins1, ix: 0, } @@ -351,14 +364,14 @@ impl Iterator for PreBinnedPatchIterator { type Item = PreBinnedPatchCoord; fn next(&mut self) -> Option { - if self.ix >= self.spec.count { + if self.ix >= self.range.count { None } else { let ret = Self::Item { range: NanoRange { - beg: self.spec.range.beg + self.ix * self.spec.bs, - end: self.spec.range.beg + (self.ix + 1) * self.spec.bs, + beg: (self.range.offset + self.ix) * self.range.grid_spec.patch_t_len, + end: (self.range.offset + self.ix + 1) * self.range.grid_spec.patch_t_len, }, }; self.ix += 1;