From 55d38b1fee7c99c55e9f34f6722a7b6781b31b86 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 31 Aug 2023 08:26:44 +0200 Subject: [PATCH] Remove duplicate --- crates/disk/src/raw/conn.rs | 77 ++++++------------------------------- crates/httpret/src/api1.rs | 4 +- 2 files changed, 14 insertions(+), 67 deletions(-) diff --git a/crates/disk/src/raw/conn.rs b/crates/disk/src/raw/conn.rs index ed9311a..0cae711 100644 --- a/crates/disk/src/raw/conn.rs +++ b/crates/disk/src/raw/conn.rs @@ -90,7 +90,7 @@ pub async fn make_event_pipe( Ok(pipe) } -pub fn make_local_event_blobs_stream( +pub fn make_event_blobs_stream( range: NanoRange, fetch_info: SfChFetchInfo, expand: bool, @@ -99,38 +99,7 @@ pub fn make_local_event_blobs_stream( reqctx: ReqCtxArc, node_config: &NodeConfigCached, ) -> Result { - info!("make_local_event_blobs_stream {fetch_info:?} disk_io_tune {disk_io_tune:?}"); - // TODO should not need this for correctness. - // Should limit based on return size and latency. - let out_max_len = if node_config.node_config.cluster.is_central_storage { - 128 - } else { - 128 - }; - let event_blobs = EventChunkerMultifile::new( - range, - fetch_info.clone(), - node_config.node.clone(), - node_config.ix, - disk_io_tune, - event_chunker_conf, - expand, - out_max_len, - reqctx, - ); - Ok(event_blobs) -} - -pub fn make_remote_event_blobs_stream( - range: NanoRange, - fetch_info: SfChFetchInfo, - expand: bool, - event_chunker_conf: EventChunkerConf, - disk_io_tune: DiskIoTune, - reqctx: ReqCtxArc, - node_config: &NodeConfigCached, -) -> Result>, Error> { - debug!("make_remote_event_blobs_stream"); + debug!("make_local_event_blobs_stream {fetch_info:?} disk_io_tune {disk_io_tune:?}"); // TODO should not need this for correctness. // Should limit based on return size and latency. let out_max_len = if node_config.node_config.cluster.is_central_storage { @@ -167,38 +136,16 @@ pub async fn make_event_blobs_pipe_real( let expand = subq.transform().need_one_before_range(); let range = subq.range(); let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024)); - // TODO should depend on host config - let do_local = node_config.node_config.cluster.is_central_storage; - let pipe = if do_local { - let event_blobs = make_local_event_blobs_stream( - range.try_into()?, - fetch_info.clone(), - expand, - event_chunker_conf, - subq.disk_io_tune(), - reqctx, - node_config, - )?; - Box::pin(event_blobs) as _ - } else { - let event_blobs = make_remote_event_blobs_stream( - range.try_into()?, - fetch_info.clone(), - expand, - event_chunker_conf, - subq.disk_io_tune(), - reqctx, - node_config, - )?; - /* - type ItemType = Sitemty; - let s = event_blobs.map(|item: ItemType| Box::new(item) as Box); - //let s = tracing_futures::Instrumented::instrument(s, tracing::info_span!("make_event_blobs_pipe")); - let pipe: Pin> + Send>>; - pipe = Box::pin(s); - pipe*/ - Box::pin(event_blobs) as _ - }; + let event_blobs = make_event_blobs_stream( + range.try_into()?, + fetch_info.clone(), + expand, + event_chunker_conf, + subq.disk_io_tune(), + reqctx, + node_config, + )?; + let pipe = Box::pin(event_blobs) as _; Ok(pipe) } diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index 3bc8da2..e0adf92 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -7,7 +7,7 @@ use bytes::BufMut; use bytes::BytesMut; use disk::eventchunker::EventChunkerConf; use disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes; -use disk::raw::conn::make_local_event_blobs_stream; +use disk::raw::conn::make_event_blobs_stream; use futures_util::Stream; use futures_util::StreamExt; use http::Method; @@ -757,7 +757,7 @@ impl DataApiPython3DataStream { debug!("set up central storage stream"); // TODO pull up this config let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024)); - let s = make_local_event_blobs_stream( + let s = make_event_blobs_stream( self.range.clone(), fetch_info.clone(), one_before,