Derive default decompress flag from central storage setting
This commit is contained in:
@@ -887,12 +887,7 @@ impl Api1EventsBinaryHandler {
|
||||
debug!("request body_data string: {}", String::from_utf8_lossy(&body_data));
|
||||
}
|
||||
let qu = match serde_json::from_slice::<Api1Query>(&body_data) {
|
||||
Ok(mut qu) => {
|
||||
if node_config.node_config.cluster.is_central_storage {
|
||||
qu.set_decompress(false);
|
||||
}
|
||||
qu
|
||||
}
|
||||
Ok(x) => x,
|
||||
Err(e) => {
|
||||
error!("got body_data: {:?}", String::from_utf8_lossy(&body_data[..]));
|
||||
error!("can not parse: {e}");
|
||||
@@ -982,7 +977,8 @@ impl Api1EventsBinaryHandler {
|
||||
// TODO carry those settings from the query again
|
||||
settings,
|
||||
DiskIoTune::default(),
|
||||
qu.decompress(),
|
||||
qu.decompress()
|
||||
.unwrap_or_else(|| ncc.node_config.cluster.decompress_default()),
|
||||
qu.events_max().unwrap_or(u64::MAX),
|
||||
reqctx.clone(),
|
||||
ncc.clone(),
|
||||
|
||||
@@ -19,6 +19,7 @@ use serde::Deserialize;
|
||||
use serde::Deserializer;
|
||||
use serde::Serialize;
|
||||
use serde::Serializer;
|
||||
use std::borrow::Cow;
|
||||
use std::collections::VecDeque;
|
||||
use std::time::Instant;
|
||||
|
||||
@@ -284,18 +285,19 @@ impl EventFull {
|
||||
i: usize,
|
||||
_scalar_type: &ScalarType,
|
||||
shape: &Shape,
|
||||
) -> Result<Vec<u8>, DecompError> {
|
||||
) -> Result<Cow<[u8]>, DecompError> {
|
||||
if let Some(comp) = &self.comps[i] {
|
||||
match comp {
|
||||
CompressionMethod::BitshuffleLZ4 => {
|
||||
let type_size = self.scalar_types[i].bytes() as u32;
|
||||
let ele_count = self.shapes[i].ele_count();
|
||||
decompress(&self.blobs[i], type_size, ele_count, shape.ele_count())
|
||||
let data = decompress(&self.blobs[i], type_size, ele_count, shape.ele_count())?;
|
||||
Ok(Cow::Owned(data))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// TODO use a Cow type.
|
||||
Ok(self.blobs[i].clone())
|
||||
let data = &self.blobs[i];
|
||||
Ok(Cow::Borrowed(data.as_slice()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -523,6 +523,16 @@ pub struct Cluster {
|
||||
pub cache_scylla: Option<ScyllaConfig>,
|
||||
}
|
||||
|
||||
impl Cluster {
|
||||
pub fn decompress_default(&self) -> bool {
|
||||
if self.is_central_storage {
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct NodeConfig {
|
||||
pub name: String,
|
||||
|
||||
@@ -231,8 +231,8 @@ pub struct Api1Query {
|
||||
// All following parameters are private and not to be used
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
file_io_buffer_size: Option<FileIoBufferSize>,
|
||||
#[serde(default = "bool_true", skip_serializing_if = "bool_is_true")]
|
||||
decompress: bool,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
decompress: Option<bool>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
events_max: Option<u64>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
@@ -249,7 +249,7 @@ impl Api1Query {
|
||||
range,
|
||||
channels,
|
||||
timeout: None,
|
||||
decompress: true,
|
||||
decompress: None,
|
||||
events_max: None,
|
||||
file_io_buffer_size: None,
|
||||
io_queue_len: None,
|
||||
@@ -291,7 +291,7 @@ impl Api1Query {
|
||||
&self.log_level
|
||||
}
|
||||
|
||||
pub fn decompress(&self) -> bool {
|
||||
pub fn decompress(&self) -> Option<bool> {
|
||||
self.decompress
|
||||
}
|
||||
|
||||
@@ -299,7 +299,7 @@ impl Api1Query {
|
||||
self.events_max
|
||||
}
|
||||
|
||||
pub fn set_decompress(&mut self, v: bool) {
|
||||
pub fn set_decompress(&mut self, v: Option<bool>) {
|
||||
self.decompress = v;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user