diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 5b8c9bc..90de027 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -60,6 +60,7 @@ async fn agg_x_dim_0_inner() { range.clone(), query.channel_config.clone(), node.clone(), + 0, query.buffer_size as usize, event_chunker_conf, ) @@ -71,7 +72,7 @@ async fn agg_x_dim_0_inner() { } k }) - .into_binned_t(BinnedRange::covering_range(range, bin_count).unwrap()) + .into_binned_t(BinnedRange::covering_range(range, bin_count).unwrap().unwrap()) .map(|k| { if false { trace!("after T binning {:?}", k.as_ref().unwrap()); @@ -123,6 +124,7 @@ async fn agg_x_dim_1_inner() { range.clone(), query.channel_config.clone(), node.clone(), + 0, query.buffer_size as usize, event_chunker_conf, ) @@ -141,7 +143,7 @@ async fn agg_x_dim_1_inner() { //info!("after X binning {:?}", k.as_ref().unwrap()); k }) - .into_binned_t(BinnedRange::covering_range(range, bin_count).unwrap()) + .into_binned_t(BinnedRange::covering_range(range, bin_count).unwrap().unwrap()) .map(|k| { info!("after T binning {:?}", k.as_ref().unwrap()); k diff --git a/disk/src/binned.rs b/disk/src/binned.rs index d937d2f..8e8f015 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -27,14 +27,15 @@ pub async fn binned_stream(node_config: &NodeConfigCached, query: &BinnedQuery) } let range = query.range(); let channel_config = read_local_config(&query.channel(), &node_config.node).await?; - let entry = extract_matching_config_entry(range, &channel_config); + let entry = extract_matching_config_entry(range, &channel_config)?; info!("binned_bytes_for_http found config entry {:?}", entry); - let range = BinnedRange::covering_range(range.clone(), query.bin_count()).ok_or(Error::with_msg(format!( + let range = BinnedRange::covering_range(range.clone(), query.bin_count())?.ok_or(Error::with_msg(format!( "binned_bytes_for_http BinnedRange::covering_range returned None" )))?; let perf_opts = PerfOpts { inmem_bufcap: 512 }; + let _shape = entry.to_shape()?; match PreBinnedPatchRange::covering_range(query.range().clone(), query.bin_count()) { - Some(pre_range) => { + Ok(Some(pre_range)) => { info!("binned_bytes_for_http found pre_range: {:?}", pre_range); if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { let msg = format!( @@ -55,7 +56,7 @@ pub async fn binned_stream(node_config: &NodeConfigCached, query: &BinnedQuery) let ret = BinnedStream::new(Box::pin(s1))?; Ok(ret) } - None => { + Ok(None) => { info!( "binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}", range @@ -71,6 +72,7 @@ pub async fn binned_stream(node_config: &NodeConfigCached, query: &BinnedQuery) let ret = BinnedStream::new(Box::pin(s1))?; Ok(ret) } + Err(e) => Err(e), } } diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 2abe2ed..e0a8202 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -10,6 +10,7 @@ use err::Error; use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; use hyper::Response; +use netpod::timeunits::SEC; use netpod::{ AggKind, ByteSize, Channel, Cluster, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchCoord, ToNanos, }; @@ -145,6 +146,10 @@ impl PreBinnedQuery { .get("bin_t_len") .ok_or(Error::with_msg("missing bin_t_len"))? .parse()?; + let patch_t_len = params + .get("patch_t_len") + .ok_or(Error::with_msg("missing patch_t_len"))? + .parse()?; let disk_stats_every = params .get("disk_stats_every_kb") .ok_or(Error::with_msg("missing disk_stats_every_kb"))?; @@ -152,7 +157,7 @@ impl PreBinnedQuery { .parse() .map_err(|e| Error::with_msg(format!("can not parse disk_stats_every_kb {:?}", e)))?; let ret = PreBinnedQuery { - patch: PreBinnedPatchCoord::new(bin_t_len, patch_ix), + patch: PreBinnedPatchCoord::new(bin_t_len, patch_t_len, patch_ix), agg_kind: AggKind::DimXBins1, channel: channel_from_params(¶ms)?, cache_usage: cache_usage_from_params(¶ms)?, @@ -415,6 +420,7 @@ pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, c hash.update(&patch_coord.patch_beg().to_le_bytes()); hash.update(&patch_coord.patch_end().to_le_bytes()); hash.update(&patch_coord.bin_t_len().to_le_bytes()); + hash.update(&patch_coord.patch_t_len().to_le_bytes()); let mut out = [0; 32]; hash.finalize(&mut out); let a = [out[0], out[1], out[2], out[3]]; @@ -466,9 +472,12 @@ impl CacheFileDesc { .join(&hc[3..6]) .join(&self.channel.name) .join(format!("{:?}", self.agg_kind)) - .join(format!("{:019}", self.patch.spec().bin_t_len())) - .join(&hash[0..2]) - .join(format!("{:019}", self.patch.ix())) + .join(format!( + "{:010}-{:010}", + self.patch.spec().bin_t_len() / SEC, + self.patch.spec().patch_t_len() / SEC + )) + .join(format!("{}-{:012}", &hash[0..6], self.patch.ix())) } } diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index 8e5c68b..7e152e8 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -104,7 +104,10 @@ impl PreBinnedValueStream { } // TODO do I need to set up more transformations or binning to deliver the requested data? let count = self.query.patch.patch_t_len() / self.query.patch.bin_t_len(); - let range = BinnedRange::covering_range(evq.range.clone(), count).unwrap(); + let range = BinnedRange::covering_range(evq.range.clone(), count) + .unwrap() + .ok_or(Error::with_msg("covering_range returns None")) + .unwrap(); let perf_opts = PerfOpts { inmem_bufcap: 512 }; let s1 = MergedFromRemotes::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); let s1 = s1.into_binned_t(range); @@ -172,16 +175,18 @@ impl PreBinnedValueStream { self.fut2 = Some(Box::pin(s)); } - fn try_setup_fetch_prebinned_higher_res(&mut self) { + fn try_setup_fetch_prebinned_higher_res(&mut self) -> Result<(), Error> { let range = self.query.patch.patch_range(); match PreBinnedPatchRange::covering_range(range, self.query.patch.bin_count() + 1) { - Some(range) => { + Ok(Some(range)) => { self.setup_from_higher_res_prebinned(range); } - None => { + Ok(None) => { self.setup_merged_from_remotes(); } + Err(e) => return Err(e), } + Ok(()) } } @@ -322,18 +327,27 @@ impl Stream for PreBinnedValueStream { continue 'outer; } Err(e) => match e.kind() { - std::io::ErrorKind::NotFound => { - self.try_setup_fetch_prebinned_higher_res(); - if self.fut2.is_none() { - let e = Err(Error::with_msg(format!( - "try_setup_fetch_prebinned_higher_res failed" - ))); - self.errored = true; - Ready(Some(e)) - } else { - continue 'outer; + std::io::ErrorKind::NotFound => match self.try_setup_fetch_prebinned_higher_res() { + Ok(_) => { + if self.fut2.is_none() { + let e = Err(Error::with_msg(format!( + "try_setup_fetch_prebinned_higher_res failed" + ))); + self.errored = true; + Ready(Some(e)) + } else { + continue 'outer; + } } - } + Err(e) => { + let e = Error::with_msg(format!( + "try_setup_fetch_prebinned_higher_res error: {:?}", + e + )); + self.errored = true; + Ready(Some(Err(e))) + } + }, _ => { error!("File I/O error: {:?}", e); self.errored = true; diff --git a/disk/src/channelconfig.rs b/disk/src/channelconfig.rs index 65cbf24..55e298b 100644 --- a/disk/src/channelconfig.rs +++ b/disk/src/channelconfig.rs @@ -1,6 +1,6 @@ use err::Error; use netpod::timeunits::MS; -use netpod::{Channel, NanoRange, Nanos, Node, ScalarType}; +use netpod::{Channel, NanoRange, Nanos, Node, ScalarType, Shape}; use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8}; use nom::Needed; #[allow(unused_imports)] @@ -68,6 +68,22 @@ pub struct ConfigEntry { value_converter: Option, } +impl ConfigEntry { + pub fn to_shape(&self) -> Result { + let ret = match &self.shape { + Some(lens) => { + if lens.len() == 1 { + Shape::Wave(lens[0]) + } else { + return Err(Error::with_msg(format!("Channel config unsupported shape {:?}", self)))?; + } + } + None => Shape::Scalar, + }; + Ok(ret) + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct Config { pub format_version: i16, diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 6e29fee..2007578 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -6,6 +6,7 @@ use err::Error; use futures_core::Stream; use futures_util::StreamExt; use netpod::log::*; +use netpod::timeunits::SEC; use netpod::{ChannelConfig, NanoRange, Node}; use std::pin::Pin; use std::sync::atomic::AtomicU64; @@ -24,6 +25,7 @@ pub struct EventBlobsComplete { completed: bool, max_ts: Arc, files_count: u32, + node_ix: usize, } impl EventBlobsComplete { @@ -31,6 +33,7 @@ impl EventBlobsComplete { range: NanoRange, channel_config: ChannelConfig, node: Node, + node_ix: usize, buffer_size: usize, event_chunker_conf: EventChunkerConf, ) -> Self { @@ -46,6 +49,7 @@ impl EventBlobsComplete { completed: false, max_ts: Arc::new(AtomicU64::new(0)), files_count: 0, + node_ix, } } } @@ -82,7 +86,6 @@ impl Stream for EventBlobsComplete { let item = LogItem::quick(Level::INFO, format!("handle file {:?}", path)); match file.file { Some(file) => { - info!("got file {:?}", path); let inp = Box::pin(file_content_stream(file, self.buffer_size as usize)); let chunker = EventChunker::from_event_boundary( inp, @@ -94,9 +97,7 @@ impl Stream for EventBlobsComplete { ); self.evs = Some(chunker); } - None => { - info!("skip {:?}", path); - } + None => {} } Ready(Some(Ok(EventChunkerItem::Log(item)))) } @@ -109,7 +110,13 @@ impl Stream for EventBlobsComplete { self.data_completed = true; let item = LogItem::quick( Level::INFO, - format!("EventBlobsComplete used {} datafiles", self.files_count), + format!( + "EventBlobsComplete used {} datafiles beg {} end {} node_ix {}", + self.files_count, + self.range.beg / SEC, + self.range.end / SEC, + self.node_ix + ), ); Ready(Some(Ok(EventChunkerItem::Log(item)))) } diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index d212139..0117627 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -10,8 +10,7 @@ use crate::raw::{EventQueryJsonStringFrame, EventsQuery}; use err::Error; use futures_util::StreamExt; use netpod::log::*; -use netpod::timeunits::SEC; -use netpod::{ByteSize, NodeConfigCached, PerfOpts, Shape}; +use netpod::{ByteSize, NodeConfigCached, PerfOpts}; use std::net::SocketAddr; use tokio::io::AsyncWriteExt; use tokio::net::tcp::OwnedWriteHalf; @@ -133,20 +132,9 @@ async fn raw_conn_handler_inner_try( Ok(k) => k, Err(e) => return Err((e, netout))?, }; - //info!("found config entry {:?}", entry); - info!("raw_conn_handler_inner_try beg {}", range.beg / SEC); - let shape = match &entry.shape { - Some(lens) => { - if lens.len() == 1 { - Shape::Wave(lens[0]) - } else { - return Err(( - Error::with_msg(format!("Channel config unsupported shape {:?}", entry)), - netout, - ))?; - } - } - None => Shape::Scalar, + let shape = match entry.to_shape() { + Ok(k) => k, + Err(e) => return Err((e, netout))?, }; let channel_config = netpod::ChannelConfig { channel: evq.channel.clone(), @@ -165,6 +153,7 @@ async fn raw_conn_handler_inner_try( range.clone(), channel_config.clone(), node_config.node.clone(), + node_config.ix, buffer_size, event_chunker_conf, ) diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index a04400a..146c914 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -250,7 +250,9 @@ pub mod timeunits { const BIN_T_LEN_OPTIONS: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY]; -const PATCH_T_LEN_OPTIONS: [u64; 4] = [MIN * 10, HOUR * 2, DAY * 4, DAY * 32]; +const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 4] = [MIN * 60, HOUR * 2, DAY * 4, DAY * 32]; + +const PATCH_T_LEN_OPTIONS_WAVE: [u64; 4] = [MIN * 20, HOUR * 2, DAY * 4, DAY * 32]; const BIN_THRESHOLDS: [u64; 31] = [ 2, @@ -289,14 +291,15 @@ const BIN_THRESHOLDS: [u64; 31] = [ #[derive(Clone, Serialize, Deserialize)] pub struct PreBinnedPatchGridSpec { bin_t_len: u64, + patch_t_len: u64, } impl PreBinnedPatchGridSpec { - pub fn new(bin_t_len: u64) -> Self { + pub fn new(bin_t_len: u64, patch_t_len: u64) -> Self { if !Self::is_valid_bin_t_len(bin_t_len) { panic!("PreBinnedPatchGridSpec invalid bin_t_len {}", bin_t_len); } - Self { bin_t_len } + Self { bin_t_len, patch_t_len } } pub fn bin_t_len(&self) -> u64 { @@ -313,12 +316,7 @@ impl PreBinnedPatchGridSpec { } pub fn patch_t_len(&self) -> u64 { - for (i1, &j) in BIN_T_LEN_OPTIONS.iter().enumerate() { - if self.bin_t_len == j { - return PATCH_T_LEN_OPTIONS[i1]; - } - } - panic!() + self.patch_t_len } } @@ -340,36 +338,64 @@ pub struct PreBinnedPatchRange { pub count: u64, } +fn get_patch_t_len(bin_t_len: u64) -> u64 { + // TODO mechanism to select different patch lengths for different channels. + let shape = Shape::Scalar; + match shape { + Shape::Scalar => { + for (i1, &j) in BIN_T_LEN_OPTIONS.iter().enumerate() { + if bin_t_len == j { + return PATCH_T_LEN_OPTIONS_SCALAR[i1]; + } + } + } + Shape::Wave(_) => { + for (i1, &j) in BIN_T_LEN_OPTIONS.iter().enumerate() { + if bin_t_len == j { + return PATCH_T_LEN_OPTIONS_WAVE[i1]; + } + } + } + } + panic!() +} + impl PreBinnedPatchRange { /// Cover at least the given range with at least as many as the requested number of bins. - pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Option { - assert!(min_bin_count >= 1); - assert!(min_bin_count <= 2000); + pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Result, Error> { + if min_bin_count < 1 { + Err(Error::with_msg("min_bin_count < 1"))?; + } + if min_bin_count > 4000 { + Err(Error::with_msg("min_bin_count > 4000"))?; + } let dt = range.delta(); - assert!(dt <= DAY * 14); + if dt > DAY * 14 { + Err(Error::with_msg("dt > DAY * 14"))?; + } let bs = dt / min_bin_count; - //info!("BASIC bs {}", bs); let mut i1 = BIN_T_LEN_OPTIONS.len(); loop { if i1 <= 0 { - break None; + break Ok(None); } else { i1 -= 1; let t = BIN_T_LEN_OPTIONS[i1]; - //info!("look at threshold {} bs {}", t, bs); if t <= bs { let bin_t_len = t; - let grid_spec = PreBinnedPatchGridSpec { bin_t_len }; - let pl = grid_spec.patch_t_len(); + let patch_t_len = get_patch_t_len(bin_t_len); + let grid_spec = PreBinnedPatchGridSpec { bin_t_len, patch_t_len }; + let pl = patch_t_len; let ts1 = range.beg / pl * pl; let ts2 = (range.end + pl - 1) / pl * pl; let count = (ts2 - ts1) / pl; let offset = ts1 / pl; - break Some(Self { + let ret = Self { grid_spec, count, offset, - }); + }; + break Ok(Some(ret)); } } } @@ -427,9 +453,9 @@ impl PreBinnedPatchCoord { ) } - pub fn new(bin_t_len: u64, patch_ix: u64) -> Self { + pub fn new(bin_t_len: u64, patch_t_len: u64, patch_ix: u64) -> Self { Self { - spec: PreBinnedPatchGridSpec::new(bin_t_len), + spec: PreBinnedPatchGridSpec::new(bin_t_len, patch_t_len), ix: patch_ix, } } @@ -514,16 +540,22 @@ pub struct BinnedRange { } impl BinnedRange { - pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Option { - assert!(min_bin_count >= 1); - assert!(min_bin_count <= 2000); + pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Result, Error> { + if min_bin_count < 1 { + Err(Error::with_msg("min_bin_count < 1"))?; + } + if min_bin_count > 4000 { + Err(Error::with_msg("min_bin_count > 4000"))?; + } let dt = range.delta(); - assert!(dt <= DAY * 14); + if dt > DAY * 14 { + Err(Error::with_msg("dt > DAY * 14"))?; + } let bs = dt / min_bin_count; let mut i1 = BIN_THRESHOLDS.len(); loop { if i1 <= 0 { - break None; + break Ok(None); } else { i1 -= 1; let t = BIN_THRESHOLDS[i1]; @@ -534,11 +566,12 @@ impl BinnedRange { let ts2 = (range.end + pl - 1) / pl * pl; let count = (ts2 - ts1) / pl; let offset = ts1 / pl; - break Some(Self { + let ret = Self { grid_spec, count, offset, - }); + }; + break Ok(Some(ret)); } } }