Channel archiver engine index file open
This commit is contained in:
@@ -15,6 +15,7 @@ pub fn make_test_node(id: u32) -> Node {
|
||||
cache_base_path: format!("../tmpdata/node{:02}", id).into(),
|
||||
ksprefix: "ks".into(),
|
||||
backend: "testbackend".into(),
|
||||
splits: None,
|
||||
archiver_appliance: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ use crate::dataopen::{open_expanded_files, open_files, OpenedFileSet};
|
||||
use crate::eventchunker::{EventChunker, EventChunkerConf, EventFull};
|
||||
use crate::file_content_stream;
|
||||
use crate::merge::MergedStream;
|
||||
use crate::rangefilter::RangeFilter;
|
||||
use err::Error;
|
||||
use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
@@ -108,7 +109,7 @@ impl Stream for EventChunkerMultifile {
|
||||
Ready(Some(k)) => match k {
|
||||
Ok(ofs) => {
|
||||
self.files_count += ofs.files.len() as u32;
|
||||
if false && ofs.files.len() == 1 {
|
||||
if ofs.files.len() == 1 {
|
||||
let mut ofs = ofs;
|
||||
let file = ofs.files.pop().unwrap();
|
||||
let path = file.path;
|
||||
@@ -131,7 +132,9 @@ impl Stream for EventChunkerMultifile {
|
||||
self.expand,
|
||||
self.do_decompress,
|
||||
);
|
||||
self.evs = Some(Box::pin(chunker));
|
||||
let filtered =
|
||||
RangeFilter::new(chunker, self.range.clone(), self.expand);
|
||||
self.evs = Some(Box::pin(filtered));
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
@@ -166,7 +169,8 @@ impl Stream for EventChunkerMultifile {
|
||||
}
|
||||
}
|
||||
let merged = MergedStream::new(chunkers);
|
||||
self.evs = Some(Box::pin(merged));
|
||||
let filtered = RangeFilter::new(merged, self.range.clone(), self.expand);
|
||||
self.evs = Some(Box::pin(filtered));
|
||||
Ready(Some(Ok(StreamItem::Log(item))))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -124,6 +124,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
|
||||
cache_base_path: data_base_path.join(format!("node{:02}", i1)),
|
||||
ksprefix: ksprefix.clone(),
|
||||
backend: "testbackend".into(),
|
||||
splits: None,
|
||||
archiver_appliance: None,
|
||||
};
|
||||
ensemble.nodes.push(node);
|
||||
|
||||
@@ -46,7 +46,17 @@ pub async fn datapaths_for_timebin(
|
||||
}
|
||||
let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a });
|
||||
if vv == 10 {
|
||||
splits.push(dn.parse::<u64>()?);
|
||||
let split: u64 = dn.parse()?;
|
||||
match &node.splits {
|
||||
Some(sps) => {
|
||||
if sps.contains(&split) {
|
||||
splits.push(split);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
splits.push(split);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut ret = vec![];
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items::{Appendable, Clearable, PushableIndex, RangeCompletableItem, Sitemty, StreamItem, WithTimestamps};
|
||||
use netpod::log::*;
|
||||
use netpod::NanoRange;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
pub struct RangeFilter<S, ITY> {
|
||||
inp: S,
|
||||
@@ -95,6 +95,10 @@ where
|
||||
}
|
||||
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
|
||||
}
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => {
|
||||
warn!("\n\nRangeFilter got RangeComplete\n");
|
||||
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
|
||||
}
|
||||
k => Ready(Some(k)),
|
||||
},
|
||||
Ready(None) => {
|
||||
|
||||
Reference in New Issue
Block a user