From 790edee19234fc21986f0636be178a571382f026 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 31 Aug 2023 08:33:39 +0200 Subject: [PATCH] Simplify --- crates/disk/src/raw/conn.rs | 16 +++++----------- crates/httpret/src/api1.rs | 14 +------------- crates/nodenet/src/conn.rs | 2 +- 3 files changed, 7 insertions(+), 25 deletions(-) diff --git a/crates/disk/src/raw/conn.rs b/crates/disk/src/raw/conn.rs index 0cae711..f03ebdb 100644 --- a/crates/disk/src/raw/conn.rs +++ b/crates/disk/src/raw/conn.rs @@ -121,18 +121,12 @@ pub fn make_event_blobs_stream( Ok(event_blobs) } -pub async fn make_event_blobs_pipe_real( +pub fn make_event_blobs_pipe_real( subq: &EventsSubQuery, fetch_info: &SfChFetchInfo, reqctx: ReqCtxArc, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { - if false { - match dbconn::channel_exists(subq.name(), &node_config).await { - Ok(_) => (), - Err(e) => return Err(e)?, - } - } let expand = subq.transform().need_one_before_range(); let range = subq.range(); let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024)); @@ -149,7 +143,7 @@ pub async fn make_event_blobs_pipe_real( Ok(pipe) } -pub async fn make_event_blobs_pipe_test( +pub fn make_event_blobs_pipe_test( subq: &EventsSubQuery, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { @@ -192,7 +186,7 @@ pub async fn make_event_blobs_pipe_test( } } -pub async fn make_event_blobs_pipe( +pub fn make_event_blobs_pipe( subq: &EventsSubQuery, fetch_info: &SfChFetchInfo, reqctx: ReqCtxArc, @@ -200,8 +194,8 @@ pub async fn make_event_blobs_pipe( ) -> Result> + Send>>, Error> { debug!("make_event_blobs_pipe {subq:?}"); if subq.backend() == TEST_BACKEND { - make_event_blobs_pipe_test(subq, node_config).await + make_event_blobs_pipe_test(subq, node_config) } else { - make_event_blobs_pipe_real(subq, fetch_info, reqctx, node_config).await + make_event_blobs_pipe_real(subq, fetch_info, reqctx, node_config) } } diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index e0adf92..a59c368 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -747,7 +747,6 @@ impl DataApiPython3DataStream { TransformQuery::for_event_blobs(), ); let subq = EventsSubQuery::from_parts(select, self.settings.clone(), self.reqctx.reqid().into()); - let one_before = subq.transform().need_one_before_range(); debug!("query for event blobs retrieval subq {subq:?}"); // TODO important TODO debug!("TODO fix magic inmem_bufcap"); @@ -755,18 +754,7 @@ impl DataApiPython3DataStream { // TODO is this a good to place decide this? let stream = if self.node_config.node_config.cluster.is_central_storage { debug!("set up central storage stream"); - // TODO pull up this config - let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024)); - let s = make_event_blobs_stream( - self.range.clone(), - fetch_info.clone(), - one_before, - event_chunker_conf, - self.disk_io_tune.clone(), - self.reqctx.clone(), - &self.node_config, - )?; - Box::pin(s) as Pin> + Send>> + disk::raw::conn::make_event_blobs_pipe(&subq, &fetch_info, self.reqctx.clone(), &self.node_config)? } else { debug!("set up merged remote stream {}", fetch_info.name()); let s = MergedBlobsFromRemotes::new(subq, self.node_config.node_config.cluster.clone()); diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index 5e4a823..7b2af94 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -174,7 +174,7 @@ pub async fn create_response_bytes_stream( if evq.is_event_blobs() { // TODO support event blobs as transform let fetch_info = evq.ch_conf().to_sf_databuffer()?; - let stream = disk::raw::conn::make_event_blobs_pipe(&evq, &fetch_info, reqctx, ncc).await?; + let stream = disk::raw::conn::make_event_blobs_pipe(&evq, &fetch_info, reqctx, ncc)?; // let stream = stream.map(|x| Box::new(x) as _); let stream = stream.map(|x| x.make_frame().map(|x| x.freeze())); let ret = Box::pin(stream);