diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index 0a38618..eb17877 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -887,12 +887,7 @@ impl Api1EventsBinaryHandler { debug!("request body_data string: {}", String::from_utf8_lossy(&body_data)); } let qu = match serde_json::from_slice::(&body_data) { - Ok(mut qu) => { - if node_config.node_config.cluster.is_central_storage { - qu.set_decompress(false); - } - qu - } + Ok(x) => x, Err(e) => { error!("got body_data: {:?}", String::from_utf8_lossy(&body_data[..])); error!("can not parse: {e}"); @@ -982,7 +977,8 @@ impl Api1EventsBinaryHandler { // TODO carry those settings from the query again settings, DiskIoTune::default(), - qu.decompress(), + qu.decompress() + .unwrap_or_else(|| ncc.node_config.cluster.decompress_default()), qu.events_max().unwrap_or(u64::MAX), reqctx.clone(), ncc.clone(), diff --git a/crates/items_2/src/eventfull.rs b/crates/items_2/src/eventfull.rs index 9341c84..68f39af 100644 --- a/crates/items_2/src/eventfull.rs +++ b/crates/items_2/src/eventfull.rs @@ -19,6 +19,7 @@ use serde::Deserialize; use serde::Deserializer; use serde::Serialize; use serde::Serializer; +use std::borrow::Cow; use std::collections::VecDeque; use std::time::Instant; @@ -284,18 +285,19 @@ impl EventFull { i: usize, _scalar_type: &ScalarType, shape: &Shape, - ) -> Result, DecompError> { + ) -> Result, DecompError> { if let Some(comp) = &self.comps[i] { match comp { CompressionMethod::BitshuffleLZ4 => { let type_size = self.scalar_types[i].bytes() as u32; let ele_count = self.shapes[i].ele_count(); - decompress(&self.blobs[i], type_size, ele_count, shape.ele_count()) + let data = decompress(&self.blobs[i], type_size, ele_count, shape.ele_count())?; + Ok(Cow::Owned(data)) } } } else { - // TODO use a Cow type. - Ok(self.blobs[i].clone()) + let data = &self.blobs[i]; + Ok(Cow::Borrowed(data.as_slice())) } } } diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index eb9672c..abef033 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -523,6 +523,16 @@ pub struct Cluster { pub cache_scylla: Option, } +impl Cluster { + pub fn decompress_default(&self) -> bool { + if self.is_central_storage { + false + } else { + true + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NodeConfig { pub name: String, diff --git a/crates/netpod/src/query/api1.rs b/crates/netpod/src/query/api1.rs index 19e4bec..7ade34c 100644 --- a/crates/netpod/src/query/api1.rs +++ b/crates/netpod/src/query/api1.rs @@ -231,8 +231,8 @@ pub struct Api1Query { // All following parameters are private and not to be used #[serde(default, skip_serializing_if = "Option::is_none")] file_io_buffer_size: Option, - #[serde(default = "bool_true", skip_serializing_if = "bool_is_true")] - decompress: bool, + #[serde(default, skip_serializing_if = "Option::is_none")] + decompress: Option, #[serde(default, skip_serializing_if = "Option::is_none")] events_max: Option, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -249,7 +249,7 @@ impl Api1Query { range, channels, timeout: None, - decompress: true, + decompress: None, events_max: None, file_io_buffer_size: None, io_queue_len: None, @@ -291,7 +291,7 @@ impl Api1Query { &self.log_level } - pub fn decompress(&self) -> bool { + pub fn decompress(&self) -> Option { self.decompress } @@ -299,7 +299,7 @@ impl Api1Query { self.events_max } - pub fn set_decompress(&mut self, v: bool) { + pub fn set_decompress(&mut self, v: Option) { self.decompress = v; } }