Remove duplicate
This commit is contained in:
@@ -90,7 +90,7 @@ pub async fn make_event_pipe(
|
||||
Ok(pipe)
|
||||
}
|
||||
|
||||
pub fn make_local_event_blobs_stream(
|
||||
pub fn make_event_blobs_stream(
|
||||
range: NanoRange,
|
||||
fetch_info: SfChFetchInfo,
|
||||
expand: bool,
|
||||
@@ -99,38 +99,7 @@ pub fn make_local_event_blobs_stream(
|
||||
reqctx: ReqCtxArc,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<EventChunkerMultifile, Error> {
|
||||
info!("make_local_event_blobs_stream {fetch_info:?} disk_io_tune {disk_io_tune:?}");
|
||||
// TODO should not need this for correctness.
|
||||
// Should limit based on return size and latency.
|
||||
let out_max_len = if node_config.node_config.cluster.is_central_storage {
|
||||
128
|
||||
} else {
|
||||
128
|
||||
};
|
||||
let event_blobs = EventChunkerMultifile::new(
|
||||
range,
|
||||
fetch_info.clone(),
|
||||
node_config.node.clone(),
|
||||
node_config.ix,
|
||||
disk_io_tune,
|
||||
event_chunker_conf,
|
||||
expand,
|
||||
out_max_len,
|
||||
reqctx,
|
||||
);
|
||||
Ok(event_blobs)
|
||||
}
|
||||
|
||||
pub fn make_remote_event_blobs_stream(
|
||||
range: NanoRange,
|
||||
fetch_info: SfChFetchInfo,
|
||||
expand: bool,
|
||||
event_chunker_conf: EventChunkerConf,
|
||||
disk_io_tune: DiskIoTune,
|
||||
reqctx: ReqCtxArc,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<impl Stream<Item = Sitemty<EventFull>>, Error> {
|
||||
debug!("make_remote_event_blobs_stream");
|
||||
debug!("make_local_event_blobs_stream {fetch_info:?} disk_io_tune {disk_io_tune:?}");
|
||||
// TODO should not need this for correctness.
|
||||
// Should limit based on return size and latency.
|
||||
let out_max_len = if node_config.node_config.cluster.is_central_storage {
|
||||
@@ -167,38 +136,16 @@ pub async fn make_event_blobs_pipe_real(
|
||||
let expand = subq.transform().need_one_before_range();
|
||||
let range = subq.range();
|
||||
let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024));
|
||||
// TODO should depend on host config
|
||||
let do_local = node_config.node_config.cluster.is_central_storage;
|
||||
let pipe = if do_local {
|
||||
let event_blobs = make_local_event_blobs_stream(
|
||||
range.try_into()?,
|
||||
fetch_info.clone(),
|
||||
expand,
|
||||
event_chunker_conf,
|
||||
subq.disk_io_tune(),
|
||||
reqctx,
|
||||
node_config,
|
||||
)?;
|
||||
Box::pin(event_blobs) as _
|
||||
} else {
|
||||
let event_blobs = make_remote_event_blobs_stream(
|
||||
range.try_into()?,
|
||||
fetch_info.clone(),
|
||||
expand,
|
||||
event_chunker_conf,
|
||||
subq.disk_io_tune(),
|
||||
reqctx,
|
||||
node_config,
|
||||
)?;
|
||||
/*
|
||||
type ItemType = Sitemty<EventFull>;
|
||||
let s = event_blobs.map(|item: ItemType| Box::new(item) as Box<dyn Framable + Send>);
|
||||
//let s = tracing_futures::Instrumented::instrument(s, tracing::info_span!("make_event_blobs_pipe"));
|
||||
let pipe: Pin<Box<dyn Stream<Item = Box<dyn Framable + Send>> + Send>>;
|
||||
pipe = Box::pin(s);
|
||||
pipe*/
|
||||
Box::pin(event_blobs) as _
|
||||
};
|
||||
let event_blobs = make_event_blobs_stream(
|
||||
range.try_into()?,
|
||||
fetch_info.clone(),
|
||||
expand,
|
||||
event_chunker_conf,
|
||||
subq.disk_io_tune(),
|
||||
reqctx,
|
||||
node_config,
|
||||
)?;
|
||||
let pipe = Box::pin(event_blobs) as _;
|
||||
Ok(pipe)
|
||||
}
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ use bytes::BufMut;
|
||||
use bytes::BytesMut;
|
||||
use disk::eventchunker::EventChunkerConf;
|
||||
use disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes;
|
||||
use disk::raw::conn::make_local_event_blobs_stream;
|
||||
use disk::raw::conn::make_event_blobs_stream;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use http::Method;
|
||||
@@ -757,7 +757,7 @@ impl DataApiPython3DataStream {
|
||||
debug!("set up central storage stream");
|
||||
// TODO pull up this config
|
||||
let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024));
|
||||
let s = make_local_event_blobs_stream(
|
||||
let s = make_event_blobs_stream(
|
||||
self.range.clone(),
|
||||
fetch_info.clone(),
|
||||
one_before,
|
||||
|
||||
Reference in New Issue
Block a user