diff --git a/daqbuffer/src/bin/daqbuffer.rs b/daqbuffer/src/bin/daqbuffer.rs index 20944b3..0b4c0ba 100644 --- a/daqbuffer/src/bin/daqbuffer.rs +++ b/daqbuffer/src/bin/daqbuffer.rs @@ -1,5 +1,5 @@ use chrono::{DateTime, Duration, Utc}; -use disk::cache::CacheUsage; +use disk::binned::query::CacheUsage; use err::Error; use netpod::log::*; use netpod::{NodeConfig, NodeConfigCached}; diff --git a/daqbuffer/src/client.rs b/daqbuffer/src/client.rs index 247b006..8a87cbc 100644 --- a/daqbuffer/src/client.rs +++ b/daqbuffer/src/client.rs @@ -1,8 +1,8 @@ use chrono::{DateTime, Utc}; use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use disk::agg::streams::StreamItem; +use disk::binned::query::{BinnedQuery, CacheUsage}; use disk::binned::RangeCompletableItem; -use disk::cache::{BinnedQuery, CacheUsage}; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::frame::makeframe::FrameType; use disk::streamlog::Streamlog; diff --git a/daqbuffer/src/test.rs b/daqbuffer/src/test.rs index a4bebc1..20ddac4 100644 --- a/daqbuffer/src/test.rs +++ b/daqbuffer/src/test.rs @@ -3,8 +3,8 @@ use bytes::BytesMut; use chrono::{DateTime, Utc}; use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use disk::agg::streams::{Bins, StatsItem, StreamItem}; +use disk::binned::query::{BinnedQuery, CacheUsage}; use disk::binned::RangeCompletableItem; -use disk::cache::{BinnedQuery, CacheUsage}; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::streamlog::Streamlog; use err::Error; diff --git a/daqbuffer/src/test/json.rs b/daqbuffer/src/test/json.rs index 35d0837..58f1baf 100644 --- a/daqbuffer/src/test/json.rs +++ b/daqbuffer/src/test/json.rs @@ -1,6 +1,6 @@ use crate::test::require_test_hosts_running; use chrono::{DateTime, Utc}; -use disk::cache::BinnedQuery; +use disk::binned::query::BinnedQuery; use err::Error; use http::StatusCode; use hyper::Body; diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 5cf8503..3f7ad4c 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -5,9 +5,10 @@ use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::{Appendable, Collectable, Collected, StreamItem, ToJsonResult}; use crate::agg::{Fits, FitsInside}; +use crate::binned::query::BinnedQuery; use crate::binned::scalar::binned_stream; use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BoxedStream}; -use crate::cache::{BinnedQuery, MergedFromRemotes}; +use crate::cache::MergedFromRemotes; use crate::decode::{Endianness, EventValues}; use crate::frame::makeframe::{FrameType, SubFrId}; use crate::raw::EventsQuery; @@ -33,6 +34,8 @@ use std::time::Duration; use tokio::fs::File; use tokio::io::{AsyncRead, ReadBuf}; +pub mod prebinned; +pub mod query; pub mod scalar; pub struct BinnedStreamRes { diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs new file mode 100644 index 0000000..17a7aa8 --- /dev/null +++ b/disk/src/binned/prebinned.rs @@ -0,0 +1,37 @@ +use crate::agg::streams::StreamItem; +use crate::binned::query::PreBinnedQuery; +use crate::binned::{RangeCompletableItem, StreamKind}; +use crate::cache::node_ix_for_patch; +use crate::cache::pbv::PreBinnedValueByteStream; +use crate::frame::makeframe::FrameType; +use err::Error; +use netpod::NodeConfigCached; + +pub fn pre_binned_bytes_for_http( + node_config: &NodeConfigCached, + query: &PreBinnedQuery, + stream_kind: SK, +) -> Result, Error> +where + SK: StreamKind, + Result>, err::Error>: FrameType, +{ + if query.channel().backend != node_config.node.backend { + let err = Error::with_msg(format!( + "backend mismatch node: {} requested: {}", + node_config.node.backend, + query.channel().backend + )); + return Err(err); + } + let patch_node_ix = node_ix_for_patch(query.patch(), query.channel(), &node_config.node_config.cluster); + if node_config.ix as u32 != patch_node_ix { + let err = Error::with_msg(format!( + "pre_binned_bytes_for_http node mismatch node_config.ix {} patch_node_ix {}", + node_config.ix, patch_node_ix + )); + return Err(err); + } + let ret = crate::cache::pbv::pre_binned_value_byte_stream_new(query, node_config, stream_kind); + Ok(ret) +} diff --git a/disk/src/binned/query.rs b/disk/src/binned/query.rs new file mode 100644 index 0000000..911cb3a --- /dev/null +++ b/disk/src/binned/query.rs @@ -0,0 +1,292 @@ +use chrono::{DateTime, TimeZone, Utc}; +use err::Error; +use netpod::log::*; +use netpod::{AggKind, ByteSize, Channel, HostPort, NanoRange, PreBinnedPatchCoord, ToNanos}; +use std::collections::BTreeMap; + +#[derive(Clone, Debug)] +pub struct PreBinnedQuery { + patch: PreBinnedPatchCoord, + agg_kind: AggKind, + channel: Channel, + cache_usage: CacheUsage, + disk_stats_every: ByteSize, + report_error: bool, +} + +impl PreBinnedQuery { + pub fn new( + patch: PreBinnedPatchCoord, + channel: Channel, + agg_kind: AggKind, + cache_usage: CacheUsage, + disk_stats_every: ByteSize, + report_error: bool, + ) -> Self { + Self { + patch, + agg_kind, + channel, + cache_usage, + disk_stats_every, + report_error, + } + } + + pub fn from_request(req: &http::request::Parts) -> Result { + let params = netpod::query_params(req.uri.query()); + let patch_ix = params + .get("patchIx") + .ok_or(Error::with_msg("missing patchIx"))? + .parse()?; + let bin_t_len = params + .get("binTlen") + .ok_or(Error::with_msg("missing binTlen"))? + .parse()?; + let patch_t_len = params + .get("patchTlen") + .ok_or(Error::with_msg("missing patchTlen"))? + .parse()?; + let disk_stats_every = params + .get("diskStatsEveryKb") + .ok_or(Error::with_msg("missing diskStatsEveryKb"))?; + let disk_stats_every = disk_stats_every + .parse() + .map_err(|e| Error::with_msg(format!("can not parse diskStatsEveryKb {:?}", e)))?; + let ret = PreBinnedQuery { + patch: PreBinnedPatchCoord::new(bin_t_len, patch_t_len, patch_ix), + agg_kind: params + .get("aggKind") + .map_or(&format!("{}", AggKind::DimXBins1), |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse aggKind {:?}", e)))?, + channel: channel_from_params(¶ms)?, + cache_usage: CacheUsage::from_params(¶ms)?, + disk_stats_every: ByteSize::kb(disk_stats_every), + report_error: params + .get("reportError") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, + }; + Ok(ret) + } + + pub fn make_query_string(&self) -> String { + format!( + "{}&channelBackend={}&channelName={}&aggKind={}&cacheUsage={}&diskStatsEveryKb={}&reportError={}", + self.patch.to_url_params_strings(), + self.channel.backend, + self.channel.name, + self.agg_kind, + self.cache_usage, + self.disk_stats_every.bytes() / 1024, + self.report_error(), + ) + } + + pub fn patch(&self) -> &PreBinnedPatchCoord { + &self.patch + } + + pub fn report_error(&self) -> bool { + self.report_error + } + + pub fn channel(&self) -> &Channel { + &self.channel + } + + pub fn agg_kind(&self) -> &AggKind { + &self.agg_kind + } + + pub fn disk_stats_every(&self) -> ByteSize { + self.disk_stats_every.clone() + } + + pub fn cache_usage(&self) -> CacheUsage { + self.cache_usage.clone() + } +} + +#[derive(Clone, Debug)] +pub enum CacheUsage { + Use, + Ignore, + Recreate, +} + +impl CacheUsage { + pub fn query_param_value(&self) -> String { + match self { + CacheUsage::Use => "use", + CacheUsage::Ignore => "ignore", + CacheUsage::Recreate => "recreate", + } + .into() + } + + pub fn from_params(params: &BTreeMap) -> Result { + let ret = params.get("cacheUsage").map_or(Ok::<_, Error>(CacheUsage::Use), |k| { + if k == "use" { + Ok(CacheUsage::Use) + } else if k == "ignore" { + Ok(CacheUsage::Ignore) + } else if k == "recreate" { + Ok(CacheUsage::Recreate) + } else { + Err(Error::with_msg(format!("unexpected cacheUsage {:?}", k)))? + } + })?; + Ok(ret) + } + + pub fn from_string(s: &str) -> Result { + let ret = if s == "ignore" { + CacheUsage::Ignore + } else if s == "recreate" { + CacheUsage::Recreate + } else if s == "use" { + CacheUsage::Use + } else { + return Err(Error::with_msg(format!("can not interpret cache usage string: {}", s))); + }; + Ok(ret) + } +} + +impl std::fmt::Display for CacheUsage { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(fmt, "{}", self.query_param_value()) + } +} + +#[derive(Clone, Debug)] +pub struct BinnedQuery { + channel: Channel, + range: NanoRange, + bin_count: u32, + agg_kind: AggKind, + cache_usage: CacheUsage, + disk_stats_every: ByteSize, + report_error: bool, +} + +impl BinnedQuery { + pub fn new(channel: Channel, range: NanoRange, bin_count: u32, agg_kind: AggKind) -> BinnedQuery { + BinnedQuery { + channel, + range, + bin_count, + agg_kind, + cache_usage: CacheUsage::Use, + disk_stats_every: ByteSize(1024 * 1024 * 4), + report_error: false, + } + } + + pub fn from_request(req: &http::request::Parts) -> Result { + let params = netpod::query_params(req.uri.query()); + let beg_date = params.get("begDate").ok_or(Error::with_msg("missing begDate"))?; + let end_date = params.get("endDate").ok_or(Error::with_msg("missing endDate"))?; + let disk_stats_every = params.get("diskStatsEveryKb").map_or("2000", |k| k); + let disk_stats_every = disk_stats_every + .parse() + .map_err(|e| Error::with_msg(format!("can not parse diskStatsEveryKb {:?}", e)))?; + let ret = BinnedQuery { + range: NanoRange { + beg: beg_date.parse::>()?.to_nanos(), + end: end_date.parse::>()?.to_nanos(), + }, + bin_count: params + .get("binCount") + .ok_or(Error::with_msg("missing binCount"))? + .parse() + .map_err(|e| Error::with_msg(format!("can not parse binCount {:?}", e)))?, + agg_kind: params + .get("aggKind") + .map_or("DimXBins1", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse aggKind {:?}", e)))?, + channel: channel_from_params(¶ms)?, + cache_usage: CacheUsage::from_params(¶ms)?, + disk_stats_every: ByteSize::kb(disk_stats_every), + report_error: params + .get("reportError") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, + }; + info!("BinnedQuery::from_request {:?}", ret); + Ok(ret) + } + + pub fn range(&self) -> &NanoRange { + &self.range + } + + pub fn channel(&self) -> &Channel { + &self.channel + } + + pub fn bin_count(&self) -> u32 { + self.bin_count + } + + pub fn agg_kind(&self) -> &AggKind { + &self.agg_kind + } + + pub fn cache_usage(&self) -> &CacheUsage { + &self.cache_usage + } + + pub fn disk_stats_every(&self) -> &ByteSize { + &self.disk_stats_every + } + + pub fn report_error(&self) -> bool { + self.report_error + } + + pub fn set_cache_usage(&mut self, k: CacheUsage) { + self.cache_usage = k; + } + + pub fn set_disk_stats_every(&mut self, k: ByteSize) { + self.disk_stats_every = k; + } + + // TODO the BinnedQuery itself should maybe already carry the full HostPort? + // On the other hand, want to keep the flexibility for the fail over possibility.. + pub fn url(&self, host: &HostPort) -> String { + let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; + format!( + "http://{}:{}/api/4/binned?cacheUsage={}&channelBackend={}&channelName={}&binCount={}&begDate={}&endDate={}&diskStatsEveryKb={}", + host.host, + host.port, + self.cache_usage, + self.channel.backend, + self.channel.name, + self.bin_count, + Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt), + Utc.timestamp_nanos(self.range.end as i64).format(date_fmt), + self.disk_stats_every.bytes() / 1024, + ) + } +} + +fn channel_from_params(params: &BTreeMap) -> Result { + let ret = Channel { + backend: params + .get("channelBackend") + .ok_or(Error::with_msg("missing channelBackend"))? + .into(), + name: params + .get("channelName") + .ok_or(Error::with_msg("missing channelName"))? + .into(), + }; + Ok(ret) +} diff --git a/disk/src/binned/scalar.rs b/disk/src/binned/scalar.rs index 7cbef79..91546e0 100644 --- a/disk/src/binned/scalar.rs +++ b/disk/src/binned/scalar.rs @@ -1,6 +1,6 @@ +use crate::binned::query::BinnedQuery; use crate::binned::{BinnedStreamRes, StreamKind}; use crate::binnedstream::BoxedStream; -use crate::cache::BinnedQuery; use crate::raw::EventsQuery; use err::Error; use netpod::log::*; diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index 48d8108..30a77db 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -1,7 +1,7 @@ use crate::agg::streams::StreamItem; +use crate::binned::query::{CacheUsage, PreBinnedQuery}; use crate::binned::{RangeCompletableItem, StreamKind}; use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream; -use crate::cache::{CacheUsage, PreBinnedQuery}; use crate::frame::makeframe::FrameType; use err::Error; use futures_core::Stream; diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 1877342..5d5a37f 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,23 +1,18 @@ use crate::agg::streams::StreamItem; use crate::binned::{RangeCompletableItem, StreamKind}; -use crate::cache::pbv::PreBinnedValueByteStream; -use crate::frame::makeframe::FrameType; use crate::merge::MergedStream; use crate::raw::EventsQuery; use bytes::Bytes; -use chrono::{DateTime, TimeZone, Utc}; +use chrono::Utc; use err::Error; use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; use hyper::{Body, Response}; use netpod::log::*; use netpod::timeunits::SEC; -use netpod::{ - AggKind, ByteSize, Channel, Cluster, HostPort, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchCoord, ToNanos, -}; +use netpod::{AggKind, Channel, Cluster, NodeConfigCached, PerfOpts, PreBinnedPatchCoord}; use serde::{Deserialize, Serialize}; -use std::collections::{BTreeMap, VecDeque}; -use std::fmt::{Display, Formatter}; +use std::collections::VecDeque; use std::future::Future; use std::io; use std::path::PathBuf; @@ -29,308 +24,6 @@ use tokio::io::{AsyncRead, ReadBuf}; pub mod pbv; pub mod pbvfs; -#[derive(Clone, Debug)] -pub enum CacheUsage { - Use, - Ignore, - Recreate, -} - -impl CacheUsage { - pub fn query_param_value(&self) -> String { - match self { - CacheUsage::Use => "use", - CacheUsage::Ignore => "ignore", - CacheUsage::Recreate => "recreate", - } - .into() - } - - pub fn from_params(params: &BTreeMap) -> Result { - let ret = params.get("cacheUsage").map_or(Ok::<_, Error>(CacheUsage::Use), |k| { - if k == "use" { - Ok(CacheUsage::Use) - } else if k == "ignore" { - Ok(CacheUsage::Ignore) - } else if k == "recreate" { - Ok(CacheUsage::Recreate) - } else { - Err(Error::with_msg(format!("unexpected cacheUsage {:?}", k)))? - } - })?; - Ok(ret) - } - - pub fn from_string(s: &str) -> Result { - let ret = if s == "ignore" { - CacheUsage::Ignore - } else if s == "recreate" { - CacheUsage::Recreate - } else if s == "use" { - CacheUsage::Use - } else { - return Err(Error::with_msg(format!("can not interpret cache usage string: {}", s))); - }; - Ok(ret) - } -} - -impl Display for CacheUsage { - fn fmt(&self, fmt: &mut Formatter) -> std::fmt::Result { - write!(fmt, "{}", self.query_param_value()) - } -} - -#[derive(Clone, Debug)] -pub struct BinnedQuery { - channel: Channel, - range: NanoRange, - bin_count: u32, - agg_kind: AggKind, - cache_usage: CacheUsage, - disk_stats_every: ByteSize, - report_error: bool, -} - -impl BinnedQuery { - pub fn new(channel: Channel, range: NanoRange, bin_count: u32, agg_kind: AggKind) -> BinnedQuery { - BinnedQuery { - channel, - range, - bin_count, - agg_kind, - cache_usage: CacheUsage::Use, - disk_stats_every: ByteSize(1024 * 1024 * 4), - report_error: false, - } - } - - pub fn from_request(req: &http::request::Parts) -> Result { - let params = netpod::query_params(req.uri.query()); - let beg_date = params.get("begDate").ok_or(Error::with_msg("missing begDate"))?; - let end_date = params.get("endDate").ok_or(Error::with_msg("missing endDate"))?; - let disk_stats_every = params.get("diskStatsEveryKb").map_or("2000", |k| k); - let disk_stats_every = disk_stats_every - .parse() - .map_err(|e| Error::with_msg(format!("can not parse diskStatsEveryKb {:?}", e)))?; - let ret = BinnedQuery { - range: NanoRange { - beg: beg_date.parse::>()?.to_nanos(), - end: end_date.parse::>()?.to_nanos(), - }, - bin_count: params - .get("binCount") - .ok_or(Error::with_msg("missing binCount"))? - .parse() - .map_err(|e| Error::with_msg(format!("can not parse binCount {:?}", e)))?, - agg_kind: params - .get("aggKind") - .map_or("DimXBins1", |k| k) - .parse() - .map_err(|e| Error::with_msg(format!("can not parse aggKind {:?}", e)))?, - channel: channel_from_params(¶ms)?, - cache_usage: CacheUsage::from_params(¶ms)?, - disk_stats_every: ByteSize::kb(disk_stats_every), - report_error: params - .get("reportError") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, - }; - info!("BinnedQuery::from_request {:?}", ret); - Ok(ret) - } - - pub fn range(&self) -> &NanoRange { - &self.range - } - - pub fn channel(&self) -> &Channel { - &self.channel - } - - pub fn bin_count(&self) -> u32 { - self.bin_count - } - - pub fn agg_kind(&self) -> &AggKind { - &self.agg_kind - } - - pub fn cache_usage(&self) -> &CacheUsage { - &self.cache_usage - } - - pub fn disk_stats_every(&self) -> &ByteSize { - &self.disk_stats_every - } - - pub fn report_error(&self) -> bool { - self.report_error - } - - pub fn set_cache_usage(&mut self, k: CacheUsage) { - self.cache_usage = k; - } - - pub fn set_disk_stats_every(&mut self, k: ByteSize) { - self.disk_stats_every = k; - } - - // TODO the BinnedQuery itself should maybe already carry the full HostPort? - // On the other hand, want to keep the flexibility for the fail over possibility.. - pub fn url(&self, host: &HostPort) -> String { - let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; - format!( - "http://{}:{}/api/4/binned?cacheUsage={}&channelBackend={}&channelName={}&binCount={}&begDate={}&endDate={}&diskStatsEveryKb={}", - host.host, - host.port, - self.cache_usage, - self.channel.backend, - self.channel.name, - self.bin_count, - Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt), - Utc.timestamp_nanos(self.range.end as i64).format(date_fmt), - self.disk_stats_every.bytes() / 1024, - ) - } -} - -#[derive(Clone, Debug)] -pub struct PreBinnedQuery { - patch: PreBinnedPatchCoord, - agg_kind: AggKind, - channel: Channel, - cache_usage: CacheUsage, - disk_stats_every: ByteSize, - report_error: bool, -} - -impl PreBinnedQuery { - pub fn new( - patch: PreBinnedPatchCoord, - channel: Channel, - agg_kind: AggKind, - cache_usage: CacheUsage, - disk_stats_every: ByteSize, - report_error: bool, - ) -> Self { - Self { - patch, - agg_kind, - channel, - cache_usage, - disk_stats_every, - report_error, - } - } - - pub fn from_request(req: &http::request::Parts) -> Result { - let params = netpod::query_params(req.uri.query()); - let patch_ix = params - .get("patchIx") - .ok_or(Error::with_msg("missing patchIx"))? - .parse()?; - let bin_t_len = params - .get("binTlen") - .ok_or(Error::with_msg("missing binTlen"))? - .parse()?; - let patch_t_len = params - .get("patchTlen") - .ok_or(Error::with_msg("missing patchTlen"))? - .parse()?; - let disk_stats_every = params - .get("diskStatsEveryKb") - .ok_or(Error::with_msg("missing diskStatsEveryKb"))?; - let disk_stats_every = disk_stats_every - .parse() - .map_err(|e| Error::with_msg(format!("can not parse diskStatsEveryKb {:?}", e)))?; - let ret = PreBinnedQuery { - patch: PreBinnedPatchCoord::new(bin_t_len, patch_t_len, patch_ix), - agg_kind: params - .get("aggKind") - .map_or(&format!("{}", AggKind::DimXBins1), |k| k) - .parse() - .map_err(|e| Error::with_msg(format!("can not parse aggKind {:?}", e)))?, - channel: channel_from_params(¶ms)?, - cache_usage: CacheUsage::from_params(¶ms)?, - disk_stats_every: ByteSize::kb(disk_stats_every), - report_error: params - .get("reportError") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, - }; - Ok(ret) - } - - pub fn make_query_string(&self) -> String { - format!( - "{}&channelBackend={}&channelName={}&aggKind={}&cacheUsage={}&diskStatsEveryKb={}&reportError={}", - self.patch.to_url_params_strings(), - self.channel.backend, - self.channel.name, - self.agg_kind, - self.cache_usage, - self.disk_stats_every.bytes() / 1024, - self.report_error(), - ) - } - - pub fn patch(&self) -> &PreBinnedPatchCoord { - &self.patch - } - - pub fn report_error(&self) -> bool { - self.report_error - } -} - -fn channel_from_params(params: &BTreeMap) -> Result { - let ret = Channel { - backend: params - .get("channelBackend") - .ok_or(Error::with_msg("missing channelBackend"))? - .into(), - name: params - .get("channelName") - .ok_or(Error::with_msg("missing channelName"))? - .into(), - }; - Ok(ret) -} - -// NOTE This answers a request for a single valid pre-binned patch. -// A user must first make sure that the grid spec is valid, and that this node is responsible for it. -// Otherwise it is an error. -pub fn pre_binned_bytes_for_http( - node_config: &NodeConfigCached, - query: &PreBinnedQuery, - stream_kind: SK, -) -> Result, Error> -where - SK: StreamKind, - Result>, err::Error>: FrameType, -{ - if query.channel.backend != node_config.node.backend { - let err = Error::with_msg(format!( - "backend mismatch node: {} requested: {}", - node_config.node.backend, query.channel.backend - )); - return Err(err); - } - let patch_node_ix = node_ix_for_patch(&query.patch, &query.channel, &node_config.node_config.cluster); - if node_config.ix as u32 != patch_node_ix { - Err(Error::with_msg(format!( - "pre_binned_bytes_for_http node mismatch node_config.ix {} patch_node_ix {}", - node_config.ix, patch_node_ix - ))) - } else { - let ret = crate::cache::pbv::pre_binned_value_byte_stream_new(query, node_config, stream_kind); - Ok(ret) - } -} - pub struct HttpBodyAsAsyncRead { inp: Response, left: Bytes, diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index 07f3327..920e10c 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -1,7 +1,8 @@ use crate::agg::streams::{Appendable, StreamItem}; +use crate::binned::query::PreBinnedQuery; use crate::binned::{RangeCompletableItem, StreamKind, WithLen}; use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream; -use crate::cache::{CacheFileDesc, MergedFromRemotes, PreBinnedQuery, WrittenPbCache}; +use crate::cache::{CacheFileDesc, MergedFromRemotes, WrittenPbCache}; use crate::frame::makeframe::{make_frame, FrameType}; use crate::raw::EventsQuery; use crate::streamlog::Streamlog; @@ -131,20 +132,20 @@ where // TODO handle errors also here via return type. fn setup_merged_from_remotes(&mut self) { let evq = EventsQuery { - channel: self.query.channel.clone(), - range: self.query.patch.patch_range(), - agg_kind: self.query.agg_kind.clone(), + channel: self.query.channel().clone(), + range: self.query.patch().patch_range(), + agg_kind: self.query.agg_kind().clone(), }; - if self.query.patch.patch_t_len() % self.query.patch.bin_t_len() != 0 { + if self.query.patch().patch_t_len() % self.query.patch().bin_t_len() != 0 { error!( "Patch length inconsistency {} {}", - self.query.patch.patch_t_len(), - self.query.patch.bin_t_len() + self.query.patch().patch_t_len(), + self.query.patch().bin_t_len() ); return; } // TODO do I need to set up more transformations or binning to deliver the requested data? - let count = self.query.patch.patch_t_len() / self.query.patch.bin_t_len(); + let count = self.query.patch().patch_t_len() / self.query.patch().bin_t_len(); let range = BinnedRange::covering_range(evq.range.clone(), count as u32) .unwrap() .ok_or(Error::with_msg("covering_range returns None")) @@ -161,7 +162,7 @@ where } fn setup_from_higher_res_prebinned(&mut self, range: PreBinnedPatchRange) { - let g = self.query.patch.bin_t_len(); + let g = self.query.patch().bin_t_len(); let h = range.grid_spec.bin_t_len(); trace!( "try_setup_fetch_prebinned_higher_res found g {} h {} ratio {} mod {} {:?}", @@ -188,18 +189,18 @@ where let s = futures_util::stream::iter(patch_it) .map({ let q2 = self.query.clone(); - let disk_stats_every = self.query.disk_stats_every.clone(); + let disk_stats_every = self.query.disk_stats_every().clone(); let stream_kind = self.stream_kind.clone(); let report_error = self.query.report_error(); move |patch| { - let query = PreBinnedQuery { + let query = PreBinnedQuery::new( patch, - channel: q2.channel.clone(), - agg_kind: q2.agg_kind.clone(), - cache_usage: q2.cache_usage.clone(), - disk_stats_every: disk_stats_every.clone(), + q2.channel().clone(), + q2.agg_kind().clone(), + q2.cache_usage().clone(), + disk_stats_every.clone(), report_error, - }; + ); PreBinnedScalarValueFetchedStream::new(&query, &node_config, &stream_kind) } }) @@ -215,8 +216,8 @@ where } fn try_setup_fetch_prebinned_higher_res(&mut self) -> Result<(), Error> { - let range = self.query.patch.patch_range(); - match PreBinnedPatchRange::covering_range(range, self.query.patch.bin_count() + 1) { + let range = self.query.patch().patch_range(); + match PreBinnedPatchRange::covering_range(range, self.query.patch().bin_count() + 1) { Ok(Some(range)) => { self.setup_from_higher_res_prebinned(range); } @@ -300,11 +301,12 @@ where self.cache_written = true; continue 'outer; } else { - match self.query.cache_usage { - super::CacheUsage::Use | super::CacheUsage::Recreate => { + use crate::binned::query::CacheUsage; + match self.query.cache_usage() { + CacheUsage::Use | CacheUsage::Recreate => { let msg = format!( "write cache file query: {:?} bin count: {}", - self.query.patch, + self.query.patch(), self.values.len(), ); self.streamlog.append(Level::INFO, msg); @@ -314,9 +316,9 @@ where ); let fut = super::write_pb_cache_min_max_avg_scalar( values, - self.query.patch.clone(), - self.query.agg_kind.clone(), - self.query.channel.clone(), + self.query.patch().clone(), + self.query.agg_kind().clone(), + self.query.channel().clone(), self.node_config.clone(), ); self.write_fut = Some(Box::pin(fut)); @@ -402,12 +404,12 @@ where } } else { let cfd = CacheFileDesc { - channel: self.query.channel.clone(), - patch: self.query.patch.clone(), - agg_kind: self.query.agg_kind.clone(), + channel: self.query.channel().clone(), + patch: self.query.patch().clone(), + agg_kind: self.query.agg_kind().clone(), }; - use super::CacheUsage; - let path = match self.query.cache_usage { + use crate::binned::query::CacheUsage; + let path = match self.query.cache_usage() { CacheUsage::Use => cfd.path(&self.node_config), _ => PathBuf::from("DOESNOTEXIST"), }; diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index 54df2cd..6cedd0c 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -1,6 +1,7 @@ use crate::agg::streams::StreamItem; +use crate::binned::query::PreBinnedQuery; use crate::binned::{RangeCompletableItem, StreamKind}; -use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead, PreBinnedQuery}; +use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead}; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::{decode_frame, FrameType}; use err::Error; @@ -29,7 +30,7 @@ where SK: StreamKind, { pub fn new(query: &PreBinnedQuery, node_config: &NodeConfigCached, stream_kind: &SK) -> Result { - let nodeix = node_ix_for_patch(&query.patch, &query.channel, &node_config.node_config.cluster); + let nodeix = node_ix_for_patch(&query.patch(), &query.channel(), &node_config.node_config.cluster); let node = &node_config.node_config.cluster.nodes[nodeix as usize]; let uri: hyper::Uri = format!( "http://{}:{}/api/4/prebinned?{}", diff --git a/disk/src/lib.rs b/disk/src/lib.rs index edf6f7e..a562a44 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -478,44 +478,6 @@ pub fn raw_concat_channel_read_stream_timebin( } } -/** -Read all events from all timebins for the given channel and split. -*/ -#[allow(dead_code)] -pub struct RawConcatChannelReader { - ksprefix: String, - keyspace: u32, - channel: netpod::Channel, - split: u32, - tbsize: u32, - buffer_size: u32, - tb: u32, - //file_reader: Option, - - // TODO - // Not enough to store a simple future here. - // That will only resolve to a single output. - // • How can I transition between Stream and async world? - // • I guess I must not poll a completed Future which comes from some async fn again after it completed. - // • relevant crates: async-stream, tokio-stream - fopen: Option>> + Send>>, -} - -impl RawConcatChannelReader { - pub fn read(self) -> Result { - let res = netpod::BodyStream { inner: Box::new(self) }; - Ok(res) - } -} - -impl futures_core::Stream for RawConcatChannelReader { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - todo!() - } -} - type Sitemty = Result>, Error>; pub mod dtflags { diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index f241bde..562f06b 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -1,7 +1,8 @@ use crate::gather::gather_get_json; use bytes::Bytes; +use disk::binned::prebinned::pre_binned_bytes_for_http; +use disk::binned::query::{BinnedQuery, PreBinnedQuery}; use disk::binned::BinnedStreamKindScalar; -use disk::cache::{BinnedQuery, PreBinnedQuery}; use disk::raw::conn::events_service; use err::Error; use future::Future; @@ -319,7 +320,7 @@ where async fn binned(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let (head, _body) = req.into_parts(); - let query = disk::cache::BinnedQuery::from_request(&head)?; + let query = BinnedQuery::from_request(&head)?; match head.headers.get("accept") { Some(v) if v == "application/octet-stream" => binned_binary(query, node_config).await, Some(v) if v == "application/json" => binned_json(query, node_config).await, @@ -365,13 +366,10 @@ async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result let query = PreBinnedQuery::from_request(&head)?; let desc = format!("pre-b-{}", query.patch().bin_t_len() / 1000000000); let span1 = span!(Level::INFO, "httpret::prebinned", desc = &desc.as_str()); - - // TODO factor from the inner scopes the fetch of the channel entry so that I can - // provide the stream_kind here: + // TODO remove StreamKind let stream_kind = BinnedStreamKindScalar::new(); - span1.in_scope(|| { - let ret = match disk::cache::pre_binned_bytes_for_http(node_config, &query, stream_kind) { + let ret = match pre_binned_bytes_for_http(node_config, &query, stream_kind) { Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped( s, format!(