This commit is contained in:
Dominik Werder
2021-05-17 15:04:44 +02:00
parent ac5b10216b
commit 1123eed116
8 changed files with 148 additions and 76 deletions

View File

@@ -60,6 +60,7 @@ async fn agg_x_dim_0_inner() {
range.clone(),
query.channel_config.clone(),
node.clone(),
0,
query.buffer_size as usize,
event_chunker_conf,
)
@@ -71,7 +72,7 @@ async fn agg_x_dim_0_inner() {
}
k
})
.into_binned_t(BinnedRange::covering_range(range, bin_count).unwrap())
.into_binned_t(BinnedRange::covering_range(range, bin_count).unwrap().unwrap())
.map(|k| {
if false {
trace!("after T binning {:?}", k.as_ref().unwrap());
@@ -123,6 +124,7 @@ async fn agg_x_dim_1_inner() {
range.clone(),
query.channel_config.clone(),
node.clone(),
0,
query.buffer_size as usize,
event_chunker_conf,
)
@@ -141,7 +143,7 @@ async fn agg_x_dim_1_inner() {
//info!("after X binning {:?}", k.as_ref().unwrap());
k
})
.into_binned_t(BinnedRange::covering_range(range, bin_count).unwrap())
.into_binned_t(BinnedRange::covering_range(range, bin_count).unwrap().unwrap())
.map(|k| {
info!("after T binning {:?}", k.as_ref().unwrap());
k

View File

@@ -27,14 +27,15 @@ pub async fn binned_stream(node_config: &NodeConfigCached, query: &BinnedQuery)
}
let range = query.range();
let channel_config = read_local_config(&query.channel(), &node_config.node).await?;
let entry = extract_matching_config_entry(range, &channel_config);
let entry = extract_matching_config_entry(range, &channel_config)?;
info!("binned_bytes_for_http found config entry {:?}", entry);
let range = BinnedRange::covering_range(range.clone(), query.bin_count()).ok_or(Error::with_msg(format!(
let range = BinnedRange::covering_range(range.clone(), query.bin_count())?.ok_or(Error::with_msg(format!(
"binned_bytes_for_http BinnedRange::covering_range returned None"
)))?;
let perf_opts = PerfOpts { inmem_bufcap: 512 };
let _shape = entry.to_shape()?;
match PreBinnedPatchRange::covering_range(query.range().clone(), query.bin_count()) {
Some(pre_range) => {
Ok(Some(pre_range)) => {
info!("binned_bytes_for_http found pre_range: {:?}", pre_range);
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
let msg = format!(
@@ -55,7 +56,7 @@ pub async fn binned_stream(node_config: &NodeConfigCached, query: &BinnedQuery)
let ret = BinnedStream::new(Box::pin(s1))?;
Ok(ret)
}
None => {
Ok(None) => {
info!(
"binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}",
range
@@ -71,6 +72,7 @@ pub async fn binned_stream(node_config: &NodeConfigCached, query: &BinnedQuery)
let ret = BinnedStream::new(Box::pin(s1))?;
Ok(ret)
}
Err(e) => Err(e),
}
}

View File

@@ -10,6 +10,7 @@ use err::Error;
use futures_core::Stream;
use futures_util::{pin_mut, StreamExt};
use hyper::Response;
use netpod::timeunits::SEC;
use netpod::{
AggKind, ByteSize, Channel, Cluster, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchCoord, ToNanos,
};
@@ -145,6 +146,10 @@ impl PreBinnedQuery {
.get("bin_t_len")
.ok_or(Error::with_msg("missing bin_t_len"))?
.parse()?;
let patch_t_len = params
.get("patch_t_len")
.ok_or(Error::with_msg("missing patch_t_len"))?
.parse()?;
let disk_stats_every = params
.get("disk_stats_every_kb")
.ok_or(Error::with_msg("missing disk_stats_every_kb"))?;
@@ -152,7 +157,7 @@ impl PreBinnedQuery {
.parse()
.map_err(|e| Error::with_msg(format!("can not parse disk_stats_every_kb {:?}", e)))?;
let ret = PreBinnedQuery {
patch: PreBinnedPatchCoord::new(bin_t_len, patch_ix),
patch: PreBinnedPatchCoord::new(bin_t_len, patch_t_len, patch_ix),
agg_kind: AggKind::DimXBins1,
channel: channel_from_params(&params)?,
cache_usage: cache_usage_from_params(&params)?,
@@ -415,6 +420,7 @@ pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, c
hash.update(&patch_coord.patch_beg().to_le_bytes());
hash.update(&patch_coord.patch_end().to_le_bytes());
hash.update(&patch_coord.bin_t_len().to_le_bytes());
hash.update(&patch_coord.patch_t_len().to_le_bytes());
let mut out = [0; 32];
hash.finalize(&mut out);
let a = [out[0], out[1], out[2], out[3]];
@@ -466,9 +472,12 @@ impl CacheFileDesc {
.join(&hc[3..6])
.join(&self.channel.name)
.join(format!("{:?}", self.agg_kind))
.join(format!("{:019}", self.patch.spec().bin_t_len()))
.join(&hash[0..2])
.join(format!("{:019}", self.patch.ix()))
.join(format!(
"{:010}-{:010}",
self.patch.spec().bin_t_len() / SEC,
self.patch.spec().patch_t_len() / SEC
))
.join(format!("{}-{:012}", &hash[0..6], self.patch.ix()))
}
}

44
disk/src/cache/pbv.rs vendored
View File

@@ -104,7 +104,10 @@ impl PreBinnedValueStream {
}
// TODO do I need to set up more transformations or binning to deliver the requested data?
let count = self.query.patch.patch_t_len() / self.query.patch.bin_t_len();
let range = BinnedRange::covering_range(evq.range.clone(), count).unwrap();
let range = BinnedRange::covering_range(evq.range.clone(), count)
.unwrap()
.ok_or(Error::with_msg("covering_range returns None"))
.unwrap();
let perf_opts = PerfOpts { inmem_bufcap: 512 };
let s1 = MergedFromRemotes::new(evq, perf_opts, self.node_config.node_config.cluster.clone());
let s1 = s1.into_binned_t(range);
@@ -172,16 +175,18 @@ impl PreBinnedValueStream {
self.fut2 = Some(Box::pin(s));
}
fn try_setup_fetch_prebinned_higher_res(&mut self) {
fn try_setup_fetch_prebinned_higher_res(&mut self) -> Result<(), Error> {
let range = self.query.patch.patch_range();
match PreBinnedPatchRange::covering_range(range, self.query.patch.bin_count() + 1) {
Some(range) => {
Ok(Some(range)) => {
self.setup_from_higher_res_prebinned(range);
}
None => {
Ok(None) => {
self.setup_merged_from_remotes();
}
Err(e) => return Err(e),
}
Ok(())
}
}
@@ -322,18 +327,27 @@ impl Stream for PreBinnedValueStream {
continue 'outer;
}
Err(e) => match e.kind() {
std::io::ErrorKind::NotFound => {
self.try_setup_fetch_prebinned_higher_res();
if self.fut2.is_none() {
let e = Err(Error::with_msg(format!(
"try_setup_fetch_prebinned_higher_res failed"
)));
self.errored = true;
Ready(Some(e))
} else {
continue 'outer;
std::io::ErrorKind::NotFound => match self.try_setup_fetch_prebinned_higher_res() {
Ok(_) => {
if self.fut2.is_none() {
let e = Err(Error::with_msg(format!(
"try_setup_fetch_prebinned_higher_res failed"
)));
self.errored = true;
Ready(Some(e))
} else {
continue 'outer;
}
}
}
Err(e) => {
let e = Error::with_msg(format!(
"try_setup_fetch_prebinned_higher_res error: {:?}",
e
));
self.errored = true;
Ready(Some(Err(e)))
}
},
_ => {
error!("File I/O error: {:?}", e);
self.errored = true;

View File

@@ -1,6 +1,6 @@
use err::Error;
use netpod::timeunits::MS;
use netpod::{Channel, NanoRange, Nanos, Node, ScalarType};
use netpod::{Channel, NanoRange, Nanos, Node, ScalarType, Shape};
use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8};
use nom::Needed;
#[allow(unused_imports)]
@@ -68,6 +68,22 @@ pub struct ConfigEntry {
value_converter: Option<String>,
}
impl ConfigEntry {
pub fn to_shape(&self) -> Result<Shape, Error> {
let ret = match &self.shape {
Some(lens) => {
if lens.len() == 1 {
Shape::Wave(lens[0])
} else {
return Err(Error::with_msg(format!("Channel config unsupported shape {:?}", self)))?;
}
}
None => Shape::Scalar,
};
Ok(ret)
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Config {
pub format_version: i16,

View File

@@ -6,6 +6,7 @@ use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{ChannelConfig, NanoRange, Node};
use std::pin::Pin;
use std::sync::atomic::AtomicU64;
@@ -24,6 +25,7 @@ pub struct EventBlobsComplete {
completed: bool,
max_ts: Arc<AtomicU64>,
files_count: u32,
node_ix: usize,
}
impl EventBlobsComplete {
@@ -31,6 +33,7 @@ impl EventBlobsComplete {
range: NanoRange,
channel_config: ChannelConfig,
node: Node,
node_ix: usize,
buffer_size: usize,
event_chunker_conf: EventChunkerConf,
) -> Self {
@@ -46,6 +49,7 @@ impl EventBlobsComplete {
completed: false,
max_ts: Arc::new(AtomicU64::new(0)),
files_count: 0,
node_ix,
}
}
}
@@ -82,7 +86,6 @@ impl Stream for EventBlobsComplete {
let item = LogItem::quick(Level::INFO, format!("handle file {:?}", path));
match file.file {
Some(file) => {
info!("got file {:?}", path);
let inp = Box::pin(file_content_stream(file, self.buffer_size as usize));
let chunker = EventChunker::from_event_boundary(
inp,
@@ -94,9 +97,7 @@ impl Stream for EventBlobsComplete {
);
self.evs = Some(chunker);
}
None => {
info!("skip {:?}", path);
}
None => {}
}
Ready(Some(Ok(EventChunkerItem::Log(item))))
}
@@ -109,7 +110,13 @@ impl Stream for EventBlobsComplete {
self.data_completed = true;
let item = LogItem::quick(
Level::INFO,
format!("EventBlobsComplete used {} datafiles", self.files_count),
format!(
"EventBlobsComplete used {} datafiles beg {} end {} node_ix {}",
self.files_count,
self.range.beg / SEC,
self.range.end / SEC,
self.node_ix
),
);
Ready(Some(Ok(EventChunkerItem::Log(item))))
}

View File

@@ -10,8 +10,7 @@ use crate::raw::{EventQueryJsonStringFrame, EventsQuery};
use err::Error;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{ByteSize, NodeConfigCached, PerfOpts, Shape};
use netpod::{ByteSize, NodeConfigCached, PerfOpts};
use std::net::SocketAddr;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::OwnedWriteHalf;
@@ -133,20 +132,9 @@ async fn raw_conn_handler_inner_try(
Ok(k) => k,
Err(e) => return Err((e, netout))?,
};
//info!("found config entry {:?}", entry);
info!("raw_conn_handler_inner_try beg {}", range.beg / SEC);
let shape = match &entry.shape {
Some(lens) => {
if lens.len() == 1 {
Shape::Wave(lens[0])
} else {
return Err((
Error::with_msg(format!("Channel config unsupported shape {:?}", entry)),
netout,
))?;
}
}
None => Shape::Scalar,
let shape = match entry.to_shape() {
Ok(k) => k,
Err(e) => return Err((e, netout))?,
};
let channel_config = netpod::ChannelConfig {
channel: evq.channel.clone(),
@@ -165,6 +153,7 @@ async fn raw_conn_handler_inner_try(
range.clone(),
channel_config.clone(),
node_config.node.clone(),
node_config.ix,
buffer_size,
event_chunker_conf,
)

View File

@@ -250,7 +250,9 @@ pub mod timeunits {
const BIN_T_LEN_OPTIONS: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY];
const PATCH_T_LEN_OPTIONS: [u64; 4] = [MIN * 10, HOUR * 2, DAY * 4, DAY * 32];
const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 4] = [MIN * 60, HOUR * 2, DAY * 4, DAY * 32];
const PATCH_T_LEN_OPTIONS_WAVE: [u64; 4] = [MIN * 20, HOUR * 2, DAY * 4, DAY * 32];
const BIN_THRESHOLDS: [u64; 31] = [
2,
@@ -289,14 +291,15 @@ const BIN_THRESHOLDS: [u64; 31] = [
#[derive(Clone, Serialize, Deserialize)]
pub struct PreBinnedPatchGridSpec {
bin_t_len: u64,
patch_t_len: u64,
}
impl PreBinnedPatchGridSpec {
pub fn new(bin_t_len: u64) -> Self {
pub fn new(bin_t_len: u64, patch_t_len: u64) -> Self {
if !Self::is_valid_bin_t_len(bin_t_len) {
panic!("PreBinnedPatchGridSpec invalid bin_t_len {}", bin_t_len);
}
Self { bin_t_len }
Self { bin_t_len, patch_t_len }
}
pub fn bin_t_len(&self) -> u64 {
@@ -313,12 +316,7 @@ impl PreBinnedPatchGridSpec {
}
pub fn patch_t_len(&self) -> u64 {
for (i1, &j) in BIN_T_LEN_OPTIONS.iter().enumerate() {
if self.bin_t_len == j {
return PATCH_T_LEN_OPTIONS[i1];
}
}
panic!()
self.patch_t_len
}
}
@@ -340,36 +338,64 @@ pub struct PreBinnedPatchRange {
pub count: u64,
}
fn get_patch_t_len(bin_t_len: u64) -> u64 {
// TODO mechanism to select different patch lengths for different channels.
let shape = Shape::Scalar;
match shape {
Shape::Scalar => {
for (i1, &j) in BIN_T_LEN_OPTIONS.iter().enumerate() {
if bin_t_len == j {
return PATCH_T_LEN_OPTIONS_SCALAR[i1];
}
}
}
Shape::Wave(_) => {
for (i1, &j) in BIN_T_LEN_OPTIONS.iter().enumerate() {
if bin_t_len == j {
return PATCH_T_LEN_OPTIONS_WAVE[i1];
}
}
}
}
panic!()
}
impl PreBinnedPatchRange {
/// Cover at least the given range with at least as many as the requested number of bins.
pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Option<Self> {
assert!(min_bin_count >= 1);
assert!(min_bin_count <= 2000);
pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Result<Option<Self>, Error> {
if min_bin_count < 1 {
Err(Error::with_msg("min_bin_count < 1"))?;
}
if min_bin_count > 4000 {
Err(Error::with_msg("min_bin_count > 4000"))?;
}
let dt = range.delta();
assert!(dt <= DAY * 14);
if dt > DAY * 14 {
Err(Error::with_msg("dt > DAY * 14"))?;
}
let bs = dt / min_bin_count;
//info!("BASIC bs {}", bs);
let mut i1 = BIN_T_LEN_OPTIONS.len();
loop {
if i1 <= 0 {
break None;
break Ok(None);
} else {
i1 -= 1;
let t = BIN_T_LEN_OPTIONS[i1];
//info!("look at threshold {} bs {}", t, bs);
if t <= bs {
let bin_t_len = t;
let grid_spec = PreBinnedPatchGridSpec { bin_t_len };
let pl = grid_spec.patch_t_len();
let patch_t_len = get_patch_t_len(bin_t_len);
let grid_spec = PreBinnedPatchGridSpec { bin_t_len, patch_t_len };
let pl = patch_t_len;
let ts1 = range.beg / pl * pl;
let ts2 = (range.end + pl - 1) / pl * pl;
let count = (ts2 - ts1) / pl;
let offset = ts1 / pl;
break Some(Self {
let ret = Self {
grid_spec,
count,
offset,
});
};
break Ok(Some(ret));
}
}
}
@@ -427,9 +453,9 @@ impl PreBinnedPatchCoord {
)
}
pub fn new(bin_t_len: u64, patch_ix: u64) -> Self {
pub fn new(bin_t_len: u64, patch_t_len: u64, patch_ix: u64) -> Self {
Self {
spec: PreBinnedPatchGridSpec::new(bin_t_len),
spec: PreBinnedPatchGridSpec::new(bin_t_len, patch_t_len),
ix: patch_ix,
}
}
@@ -514,16 +540,22 @@ pub struct BinnedRange {
}
impl BinnedRange {
pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Option<Self> {
assert!(min_bin_count >= 1);
assert!(min_bin_count <= 2000);
pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Result<Option<Self>, Error> {
if min_bin_count < 1 {
Err(Error::with_msg("min_bin_count < 1"))?;
}
if min_bin_count > 4000 {
Err(Error::with_msg("min_bin_count > 4000"))?;
}
let dt = range.delta();
assert!(dt <= DAY * 14);
if dt > DAY * 14 {
Err(Error::with_msg("dt > DAY * 14"))?;
}
let bs = dt / min_bin_count;
let mut i1 = BIN_THRESHOLDS.len();
loop {
if i1 <= 0 {
break None;
break Ok(None);
} else {
i1 -= 1;
let t = BIN_THRESHOLDS[i1];
@@ -534,11 +566,12 @@ impl BinnedRange {
let ts2 = (range.end + pl - 1) / pl * pl;
let count = (ts2 - ts1) / pl;
let offset = ts1 / pl;
break Some(Self {
let ret = Self {
grid_spec,
count,
offset,
});
};
break Ok(Some(ret));
}
}
}