This commit is contained in:
Dominik Werder
2023-06-27 17:02:37 +02:00
parent f4c1539c37
commit 8938e55f86
19 changed files with 597 additions and 385 deletions

View File

@@ -1,5 +1,7 @@
use crate::SfDbChConf;
use err::Error;
use err::thiserror;
#[allow(unused)]
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::NodeConfigCached;
use netpod::SfDbChannel;
@@ -7,13 +9,43 @@ use parse::channelconfig::extract_matching_config_entry;
use parse::channelconfig::read_local_config;
use parse::channelconfig::ChannelConfigs;
use parse::channelconfig::ConfigEntry;
use parse::channelconfig::ConfigParseError;
use std::fmt;
#[derive(Debug, thiserror::Error)]
pub enum ConfigError {
ParseError(ConfigParseError),
NotFound,
Error,
}
impl fmt::Display for ConfigError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "ConfigError::{self:?}")
}
}
impl From<ConfigParseError> for ConfigError {
fn from(value: ConfigParseError) -> Self {
match value {
ConfigParseError::FileNotFound => ConfigError::NotFound,
x => ConfigError::ParseError(x),
}
}
}
pub async fn config_entry_best_match(
range: &NanoRange,
channel: SfDbChannel,
node_config: &NodeConfigCached,
) -> Result<Option<ConfigEntry>, Error> {
let channel_config = read_local_config(channel.clone(), node_config.clone()).await?;
) -> Result<Option<ConfigEntry>, ConfigError> {
let channel_config = match read_local_config(channel.clone(), node_config.clone()).await {
Ok(x) => x,
Err(e) => match e {
ConfigParseError::FileNotFound => return Ok(None),
e => return Err(e.into()),
},
};
let entry_res = match extract_matching_config_entry(range, &channel_config) {
Ok(k) => k,
Err(e) => return Err(e)?,
@@ -24,7 +56,10 @@ pub async fn config_entry_best_match(
}
}
pub async fn channel_configs(channel: SfDbChannel, node_config: &NodeConfigCached) -> Result<ChannelConfigs, Error> {
pub async fn channel_configs(
channel: SfDbChannel,
node_config: &NodeConfigCached,
) -> Result<ChannelConfigs, ConfigParseError> {
read_local_config(channel.clone(), node_config.clone()).await
}
@@ -32,14 +67,15 @@ pub async fn channel_config_best_match(
range: NanoRange,
channel: SfDbChannel,
node_config: &NodeConfigCached,
) -> Result<Option<SfDbChConf>, Error> {
) -> Result<Option<SfDbChConf>, ConfigError> {
let best = config_entry_best_match(&range, channel.clone(), node_config).await?;
match best {
None => Ok(None),
Some(entry) => {
let shape = match entry.to_shape() {
Ok(k) => k,
Err(e) => return Err(e)?,
// TODO pass error to caller
Err(_e) => return Err(ConfigError::Error)?,
};
let channel_config = SfDbChConf {
channel: channel.clone(),

View File

@@ -103,6 +103,7 @@ impl Stream for EventChunkerMultifile {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let span1 = span!(Level::INFO, "EvChMul", node_ix = self.node_ix);
let _spg = span1.enter();
info!("EventChunkerMultifile poll_next");
use Poll::*;
'outer: loop {
break if let Some(item) = self.log_queue.pop_front() {

View File

@@ -1,6 +1,7 @@
use bitshuffle::bitshuffle_decompress;
use bytes::Buf;
use bytes::BytesMut;
use err::thiserror;
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
@@ -20,6 +21,7 @@ use netpod::ScalarType;
use netpod::SfChFetchInfo;
use netpod::Shape;
use parse::channelconfig::CompressionMethod;
use std::io::Cursor;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::Context;
@@ -29,11 +31,46 @@ use streams::dtflags::*;
use streams::filechunkread::FileChunkRead;
use streams::needminbuffer::NeedMinBuffer;
#[derive(Debug, thiserror::Error)]
pub enum DataParseError {
#[error("DataFrameLengthMismatch")]
DataFrameLengthMismatch,
#[error("FileHeaderTooShort")]
FileHeaderTooShort,
#[error("BadVersionTag")]
BadVersionTag,
#[error("HeaderTooLarge")]
HeaderTooLarge,
#[error("Utf8Error")]
Utf8Error,
#[error("EventTooShort")]
EventTooShort,
#[error("EventTooLong")]
EventTooLong,
#[error("TooManyBeforeRange")]
TooManyBeforeRange,
#[error("EventWithOptional")]
EventWithOptional,
#[error("BadTypeIndex")]
BadTypeIndex,
#[error("WaveShapeWithoutEventArray")]
WaveShapeWithoutEventArray,
#[error("ShapedWithoutDims")]
ShapedWithoutDims,
#[error("TooManyDims")]
TooManyDims,
#[error("UnknownCompression")]
UnknownCompression,
#[error("BadCompresionBlockSize")]
BadCompresionBlockSize,
}
pub struct EventChunker {
inp: NeedMinBuffer,
state: DataFileState,
need_min: u32,
fetch_info: SfChFetchInfo,
need_min_max: u32,
errored: bool,
completed: bool,
range: NanoRange,
@@ -44,16 +81,17 @@ pub struct EventChunker {
final_stats_sent: bool,
parsed_bytes: u64,
dbg_path: PathBuf,
max_ts: u64,
last_ts: u64,
expand: bool,
do_decompress: bool,
decomp_dt_histo: HistoLog2,
item_len_emit_histo: HistoLog2,
seen_before_range_count: usize,
seen_after_range_count: usize,
unordered_warn_count: usize,
repeated_ts_warn_count: usize,
unordered_count: usize,
repeated_ts_count: usize,
config_mismatch_discard: usize,
discard_count: usize,
}
impl Drop for EventChunker {
@@ -90,7 +128,68 @@ impl EventChunkerConf {
}
}
fn is_config_match(is_array: &bool, ele_count: &u64, fetch_info: &SfChFetchInfo) -> bool {
match fetch_info.shape() {
Shape::Scalar => {
if *is_array {
false
} else {
true
}
}
Shape::Wave(dim1count) => {
if (*dim1count as u64) != *ele_count {
false
} else {
true
}
}
Shape::Image(n1, n2) => {
let nt = (*n1 as u64) * (*n2 as u64);
if nt != *ele_count {
false
} else {
true
}
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum DecompError {
#[error("Error")]
Error,
}
fn decompress(databuf: &[u8], type_size: u32, ele_count: u64) -> Result<Vec<u8>, DecompError> {
if databuf.len() < 13 {
return Err(DecompError::Error);
}
let ts1 = Instant::now();
let decomp_bytes = type_size as u64 * ele_count;
let mut decomp = vec![0; decomp_bytes as usize];
let ele_size = type_size;
// TODO limit the buf slice range
match bitshuffle_decompress(&databuf[12..], &mut decomp, ele_count as usize, ele_size as usize, 0) {
Ok(c1) => {
if 12 + c1 != databuf.len() {}
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1);
// TODO analyze the histo
//self.decomp_dt_histo.ingest(dt.as_secs() as u32 + dt.subsec_micros());
Ok(decomp)
}
Err(e) => {
return Err(DecompError::Error);
}
}
}
impl EventChunker {
pub fn self_name() -> &'static str {
std::any::type_name::<Self>()
}
// TODO `expand` flag usage
pub fn from_start(
inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>,
@@ -101,13 +200,24 @@ impl EventChunker {
expand: bool,
do_decompress: bool,
) -> Self {
info!("EventChunker::{} do_decompress {}", "from_start", do_decompress);
info!(
"{}::{} do_decompress {}",
Self::self_name(),
"from_start",
do_decompress
);
let need_min_max = match fetch_info.shape() {
Shape::Scalar => 1024 * 8,
Shape::Wave(_) => 1024 * 32,
Shape::Image(_, _) => 1024 * 1024 * 40,
};
let mut inp = NeedMinBuffer::new(inp);
inp.set_need_min(6);
Self {
inp,
state: DataFileState::FileHeader,
need_min: 6,
need_min_max,
fetch_info,
errored: false,
completed: false,
@@ -119,16 +229,17 @@ impl EventChunker {
final_stats_sent: false,
parsed_bytes: 0,
dbg_path,
max_ts: 0,
last_ts: 0,
expand,
do_decompress,
decomp_dt_histo: HistoLog2::new(8),
item_len_emit_histo: HistoLog2::new(0),
seen_before_range_count: 0,
seen_after_range_count: 0,
unordered_warn_count: 0,
repeated_ts_warn_count: 0,
unordered_count: 0,
repeated_ts_count: 0,
config_mismatch_discard: 0,
discard_count: 0,
}
}
@@ -143,8 +254,10 @@ impl EventChunker {
do_decompress: bool,
) -> Self {
info!(
"EventChunker::{} do_decompress {}",
"from_event_boundary", do_decompress
"{}::{} do_decompress {}",
Self::self_name(),
"from_event_boundary",
do_decompress
);
let mut ret = Self::from_start(inp, fetch_info, range, stats_conf, dbg_path, expand, do_decompress);
ret.state = DataFileState::Event;
@@ -154,13 +267,17 @@ impl EventChunker {
}
fn parse_buf(&mut self, buf: &mut BytesMut) -> Result<ParseResult, Error> {
span!(Level::INFO, "EventChunker::parse_buf").in_scope(|| self.parse_buf_inner(buf))
span!(Level::INFO, "EventChunker::parse_buf")
.in_scope(|| self.parse_buf_inner(buf))
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))
}
fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result<ParseResult, Error> {
fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result<ParseResult, DataParseError> {
use byteorder::ReadBytesExt;
use byteorder::BE;
info!("parse_buf_inner buf len {}", buf.len());
let mut ret = EventFull::empty();
let mut parsed_bytes = 0;
use byteorder::{ReadBytesExt, BE};
loop {
if (buf.len() as u32) < self.need_min {
break;
@@ -168,16 +285,16 @@ impl EventChunker {
match self.state {
DataFileState::FileHeader => {
if buf.len() < 6 {
Err(Error::with_msg("need min 6 for FileHeader"))?;
return Err(DataParseError::FileHeaderTooShort);
}
let mut sl = std::io::Cursor::new(buf.as_ref());
let mut sl = Cursor::new(buf.as_ref());
let fver = sl.read_i16::<BE>().unwrap();
if fver != 0 {
Err(Error::with_msg("unexpected data file version"))?;
return Err(DataParseError::BadVersionTag);
}
let len = sl.read_i32::<BE>().unwrap();
if len <= 0 || len >= 128 {
Err(Error::with_msg("large channel header len"))?;
if len <= 0 || len >= 512 {
return Err(DataParseError::HeaderTooLarge);
}
let totlen = len as usize + 2;
if buf.len() < totlen {
@@ -187,9 +304,10 @@ impl EventChunker {
sl.advance(len as usize - 8);
let len2 = sl.read_i32::<BE>().unwrap();
if len != len2 {
Err(Error::with_msg("channel header len mismatch"))?;
return Err(DataParseError::DataFrameLengthMismatch);
}
String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec())?;
let _ = String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec())
.map_err(|_| DataParseError::Utf8Error);
self.state = DataFileState::Event;
self.need_min = 4;
buf.advance(totlen);
@@ -198,58 +316,62 @@ impl EventChunker {
}
DataFileState::Event => {
let p0 = 0;
let mut sl = std::io::Cursor::new(buf.as_ref());
let mut sl = Cursor::new(buf.as_ref());
let len = sl.read_i32::<BE>().unwrap();
if len < 20 || len > 1024 * 1024 * 20 {
Err(Error::with_msg("unexpected large event chunk"))?;
if len < 20 {
return Err(DataParseError::EventTooShort);
}
match self.fetch_info.shape() {
Shape::Scalar if len > 512 => return Err(DataParseError::EventTooLong),
Shape::Wave(_) if len > 8 * 1024 * 256 => return Err(DataParseError::EventTooLong),
Shape::Image(_, _) if len > 1024 * 1024 * 40 => return Err(DataParseError::EventTooLong),
_ => {}
}
let len = len as u32;
if (buf.len() as u32) < len {
self.need_min = len as u32;
break;
} else {
let mut sl = std::io::Cursor::new(buf.as_ref());
let len1b = sl.read_i32::<BE>().unwrap();
assert!(len == len1b as u32);
let mut discard = false;
let _ttl = sl.read_i64::<BE>().unwrap();
let ts = sl.read_i64::<BE>().unwrap() as u64;
let pulse = sl.read_i64::<BE>().unwrap() as u64;
if ts == self.max_ts {
if self.repeated_ts_warn_count < 20 {
if ts == self.last_ts {
self.repeated_ts_count += 1;
if self.repeated_ts_count < 20 {
let msg = format!(
"EventChunker repeated event ts ix {} ts {}.{} max_ts {}.{} config {:?} path {:?}",
self.repeated_ts_warn_count,
"EventChunker repeated event ts ix {} ts {}.{} last_ts {}.{} config {:?} path {:?}",
self.repeated_ts_count,
ts / SEC,
ts % SEC,
self.max_ts / SEC,
self.max_ts % SEC,
self.last_ts / SEC,
self.last_ts % SEC,
self.fetch_info.shape(),
self.dbg_path
);
warn!("{}", msg);
self.repeated_ts_warn_count += 1;
}
}
if ts < self.max_ts {
if self.unordered_warn_count < 20 {
if ts < self.last_ts {
discard = true;
self.unordered_count += 1;
if self.unordered_count < 20 {
let msg = format!(
"EventChunker unordered event ix {} ts {}.{} max_ts {}.{} config {:?} path {:?}",
self.unordered_warn_count,
"EventChunker unordered event ix {} ts {}.{} last_ts {}.{} config {:?} path {:?}",
self.unordered_count,
ts / SEC,
ts % SEC,
self.max_ts / SEC,
self.max_ts % SEC,
self.last_ts / SEC,
self.last_ts % SEC,
self.fetch_info.shape(),
self.dbg_path
);
warn!("{}", msg);
self.unordered_warn_count += 1;
let e = Error::with_public_msg_no_trace(msg);
return Err(e);
}
}
self.max_ts = ts;
self.last_ts = ts;
if ts >= self.range.end {
discard = true;
self.seen_after_range_count += 1;
if !self.expand || self.seen_after_range_count >= 2 {
self.seen_beyond_range = true;
@@ -258,10 +380,12 @@ impl EventChunker {
}
}
if ts < self.range.beg {
discard = true;
self.seen_before_range_count += 1;
if self.seen_before_range_count > 1 {
if self.seen_before_range_count < 20 {
let msg = format!(
"seen before range: event ts {}.{} range beg {}.{} range end {}.{} pulse {} config {:?} path {:?}",
"seen before range: {} event ts {}.{} range beg {}.{} range end {}.{} pulse {} config {:?} path {:?}",
self.seen_before_range_count,
ts / SEC,
ts % SEC,
self.range.beg / SEC,
@@ -273,8 +397,23 @@ impl EventChunker {
self.dbg_path
);
warn!("{}", msg);
let e = Error::with_public_msg(msg);
Err(e)?;
}
if self.seen_before_range_count > 100 {
let msg = format!(
"too many seen before range: {} event ts {}.{} range beg {}.{} range end {}.{} pulse {} config {:?} path {:?}",
self.seen_before_range_count,
ts / SEC,
ts % SEC,
self.range.beg / SEC,
self.range.beg % SEC,
self.range.end / SEC,
self.range.end % SEC,
pulse,
self.fetch_info.shape(),
self.dbg_path
);
error!("{}", msg);
return Err(DataParseError::TooManyBeforeRange);
}
}
let _ioc_ts = sl.read_i64::<BE>().unwrap();
@@ -282,33 +421,34 @@ impl EventChunker {
let severity = sl.read_i8().unwrap();
let optional = sl.read_i32::<BE>().unwrap();
if status != 0 {
Err(Error::with_msg(format!("status != 0: {}", status)))?;
// return Err(DataParseError::UnexpectedStatus);
// TODO count
}
if severity != 0 {
Err(Error::with_msg(format!("severity != 0: {}", severity)))?;
// return Err(DataParseError::TooManyBeforeRange);
// TODO count
}
if optional != -1 {
Err(Error::with_msg(format!("optional != -1: {}", optional)))?;
return Err(DataParseError::EventWithOptional);
}
let type_flags = sl.read_u8().unwrap();
let type_index = sl.read_u8().unwrap();
if type_index > 13 {
Err(Error::with_msg(format!("type_index: {}", type_index)))?;
return Err(DataParseError::BadTypeIndex);
}
let scalar_type = ScalarType::from_dtype_index(type_index)?;
let scalar_type =
ScalarType::from_dtype_index(type_index).map_err(|_| DataParseError::BadTypeIndex)?;
let is_compressed = type_flags & COMPRESSION != 0;
let is_array = type_flags & ARRAY != 0;
let is_big_endian = type_flags & BIG_ENDIAN != 0;
let is_shaped = type_flags & SHAPE != 0;
if let Shape::Wave(_) = self.fetch_info.shape() {
if !is_array {
Err(Error::with_msg(format!("dim1 but not array {:?}", self.fetch_info)))?;
return Err(DataParseError::WaveShapeWithoutEventArray);
}
}
let compression_method = if is_compressed { sl.read_u8().unwrap() } else { 0 };
let shape_dim = if is_shaped { sl.read_u8().unwrap() } else { 0 };
assert!(compression_method <= 0);
assert!(!is_shaped || (shape_dim >= 1 && shape_dim <= 2));
let mut shape_lens = [0, 0, 0, 0];
for i1 in 0..shape_dim {
shape_lens[i1 as usize] = sl.read_u32::<BE>().unwrap();
@@ -319,8 +459,14 @@ impl EventChunker {
Shape::Wave(shape_lens[0])
} else if shape_dim == 2 {
Shape::Image(shape_lens[0], shape_lens[1])
} else if shape_dim == 0 {
discard = true;
// return Err(DataParseError::ShapedWithoutDims);
Shape::Scalar
} else {
err::todoval()
discard = true;
// return Err(DataParseError::TooManyDims);
Shape::Scalar
}
} else {
Shape::Scalar
@@ -330,14 +476,16 @@ impl EventChunker {
if compression_method == 0 {
Some(CompressionMethod::BitshuffleLZ4)
} else {
err::todoval()
return Err(DataParseError::UnknownCompression);
}
} else {
None
};
let p1 = sl.position();
let k1 = len as u64 - (p1 - p0) - 4;
if is_compressed {
let n1 = p1 - p0;
let n2 = len as u64 - n1 - 4;
let databuf = buf[p1 as usize..(p1 as usize + n2 as usize)].as_ref();
if false && is_compressed {
//debug!("event ts {} is_compressed {}", ts, is_compressed);
let value_bytes = sl.read_u64::<BE>().unwrap();
let block_size = sl.read_u32::<BE>().unwrap();
@@ -353,128 +501,22 @@ impl EventChunker {
assert!(value_bytes < 1024 * 1024 * 20);
}
}
assert!(block_size <= 1024 * 32);
if block_size > 1024 * 32 {
return Err(DataParseError::BadCompresionBlockSize);
}
let type_size = scalar_type.bytes() as u32;
let ele_count = value_bytes / type_size as u64;
let ele_size = type_size;
let config_matches = match self.fetch_info.shape() {
Shape::Scalar => {
if is_array {
if false {
error!(
"channel config mismatch {:?} {:?} {:?} {:?}",
self.fetch_info, is_array, ele_count, self.dbg_path,
);
}
if false {
return Err(Error::with_msg(format!(
"ChannelConfig expects {:?} but we find event is_array",
self.fetch_info,
)));
}
false
} else {
true
}
}
Shape::Wave(dim1count) => {
if *dim1count != ele_count as u32 {
if false {
error!(
"channel config mismatch {:?} {:?} {:?} {:?}",
self.fetch_info, is_array, ele_count, self.dbg_path,
);
}
if false {
return Err(Error::with_msg(format!(
"ChannelConfig expects {:?} but event has ele_count {}",
self.fetch_info, ele_count,
)));
}
false
} else {
true
}
}
Shape::Image(n1, n2) => {
let nt = (*n1 as usize) * (*n2 as usize);
if nt != ele_count as usize {
if false {
error!(
"channel config mismatch {:?} {:?} {:?} {:?}",
self.fetch_info, is_array, ele_count, self.dbg_path,
);
}
if false {
return Err(Error::with_msg(format!(
"ChannelConfig expects {:?} but event has ele_count {}",
self.fetch_info, ele_count,
)));
}
false
} else {
true
}
}
};
if config_matches {
let data = buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)].as_ref();
let decomp = {
if self.do_decompress {
assert!(data.len() > 12);
let ts1 = Instant::now();
let decomp_bytes = (type_size * ele_count as u32) as usize;
let mut decomp = vec![0; decomp_bytes];
// TODO limit the buf slice range
match bitshuffle_decompress(
&data[12..],
&mut decomp,
ele_count as usize,
ele_size as usize,
0,
) {
Ok(c1) => {
assert!(c1 as u64 + 12 == k1);
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1);
// TODO analyze the histo
self.decomp_dt_histo.ingest(dt.as_secs() as u32 + dt.subsec_micros());
Some(decomp)
}
Err(e) => {
return Err(Error::with_msg(format!("decompression failed {:?}", e)))?;
}
}
} else {
None
}
};
ret.add_event(
ts,
pulse,
Some(data.to_vec()),
decomp,
ScalarType::from_dtype_index(type_index)?,
is_big_endian,
shape_this,
comp_this,
);
} else {
self.config_mismatch_discard += 1;
}
let _ele_count = value_bytes / type_size as u64;
let _ele_size = type_size;
}
if discard {
self.discard_count += 1;
} else {
if len < p1 as u32 + 4 {
let msg = format!("uncomp len: {} p1: {}", len, p1);
Err(Error::with_msg(msg))?;
}
let vlen = len - p1 as u32 - 4;
let data = &buf[p1 as usize..(p1 as u32 + vlen) as usize];
ret.add_event(
ts,
pulse,
Some(data.to_vec()),
Some(data.to_vec()),
ScalarType::from_dtype_index(type_index)?,
Some(databuf.to_vec()),
None,
scalar_type,
is_big_endian,
shape_this,
comp_this,
@@ -552,31 +594,13 @@ impl Stream for EventChunker {
// TODO gather stats about this:
self.inp.put_back(fcr);
}
match self.fetch_info.shape() {
Shape::Scalar => {
if self.need_min > 1024 * 8 {
let msg =
format!("spurious EventChunker asks for need_min {}", self.need_min);
self.errored = true;
return Ready(Some(Err(Error::with_msg(msg))));
}
}
Shape::Wave(_) => {
if self.need_min > 1024 * 32 {
let msg =
format!("spurious EventChunker asks for need_min {}", self.need_min);
self.errored = true;
return Ready(Some(Err(Error::with_msg(msg))));
}
}
Shape::Image(_, _) => {
if self.need_min > 1024 * 1024 * 20 {
let msg =
format!("spurious EventChunker asks for need_min {}", self.need_min);
self.errored = true;
return Ready(Some(Err(Error::with_msg(msg))));
}
}
if self.need_min > self.need_min_max {
let msg = format!(
"spurious EventChunker asks for need_min {} max {}",
self.need_min, self.need_min_max
);
self.errored = true;
return Ready(Some(Err(Error::with_msg(msg))));
}
let x = self.need_min;
self.inp.set_need_min(x);

View File

@@ -7,6 +7,7 @@ use items_2::eventfull::EventFull;
use items_2::merger::Merger;
use netpod::log::*;
use netpod::Cluster;
use netpod::SfChFetchInfo;
use query::api4::events::EventsSubQuery;
use std::future::Future;
use std::pin::Pin;
@@ -27,7 +28,7 @@ pub struct MergedBlobsFromRemotes {
impl MergedBlobsFromRemotes {
pub fn new(subq: EventsSubQuery, cluster: Cluster) -> Self {
debug!("MergedBlobsFromRemotes evq {:?}", subq);
debug!("MergedBlobsFromRemotes subq {:?}", subq);
let mut tcp_establish_futs = Vec::new();
for node in &cluster.nodes {
let f = x_processed_event_blobs_stream_from_node(subq.clone(), node.clone());

View File

@@ -91,7 +91,7 @@ pub async fn make_event_pipe(
pub fn make_local_event_blobs_stream(
range: NanoRange,
fetch_info: &SfChFetchInfo,
fetch_info: SfChFetchInfo,
expand: bool,
do_decompress: bool,
event_chunker_conf: EventChunkerConf,
@@ -127,7 +127,7 @@ pub fn make_local_event_blobs_stream(
pub fn make_remote_event_blobs_stream(
range: NanoRange,
fetch_info: &SfChFetchInfo,
fetch_info: SfChFetchInfo,
expand: bool,
do_decompress: bool,
event_chunker_conf: EventChunkerConf,
@@ -175,7 +175,7 @@ pub async fn make_event_blobs_pipe_real(
let pipe = if do_local {
let event_blobs = make_local_event_blobs_stream(
range.try_into()?,
fetch_info,
fetch_info.clone(),
expand,
false,
event_chunker_conf,
@@ -186,7 +186,7 @@ pub async fn make_event_blobs_pipe_real(
} else {
let event_blobs = make_remote_event_blobs_stream(
range.try_into()?,
fetch_info,
fetch_info.clone(),
expand,
true,
event_chunker_conf,