Simplify
This commit is contained in:
@@ -121,18 +121,12 @@ pub fn make_event_blobs_stream(
|
|||||||
Ok(event_blobs)
|
Ok(event_blobs)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn make_event_blobs_pipe_real(
|
pub fn make_event_blobs_pipe_real(
|
||||||
subq: &EventsSubQuery,
|
subq: &EventsSubQuery,
|
||||||
fetch_info: &SfChFetchInfo,
|
fetch_info: &SfChFetchInfo,
|
||||||
reqctx: ReqCtxArc,
|
reqctx: ReqCtxArc,
|
||||||
node_config: &NodeConfigCached,
|
node_config: &NodeConfigCached,
|
||||||
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
|
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
|
||||||
if false {
|
|
||||||
match dbconn::channel_exists(subq.name(), &node_config).await {
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(e) => return Err(e)?,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let expand = subq.transform().need_one_before_range();
|
let expand = subq.transform().need_one_before_range();
|
||||||
let range = subq.range();
|
let range = subq.range();
|
||||||
let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024));
|
let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024));
|
||||||
@@ -149,7 +143,7 @@ pub async fn make_event_blobs_pipe_real(
|
|||||||
Ok(pipe)
|
Ok(pipe)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn make_event_blobs_pipe_test(
|
pub fn make_event_blobs_pipe_test(
|
||||||
subq: &EventsSubQuery,
|
subq: &EventsSubQuery,
|
||||||
node_config: &NodeConfigCached,
|
node_config: &NodeConfigCached,
|
||||||
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
|
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
|
||||||
@@ -192,7 +186,7 @@ pub async fn make_event_blobs_pipe_test(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn make_event_blobs_pipe(
|
pub fn make_event_blobs_pipe(
|
||||||
subq: &EventsSubQuery,
|
subq: &EventsSubQuery,
|
||||||
fetch_info: &SfChFetchInfo,
|
fetch_info: &SfChFetchInfo,
|
||||||
reqctx: ReqCtxArc,
|
reqctx: ReqCtxArc,
|
||||||
@@ -200,8 +194,8 @@ pub async fn make_event_blobs_pipe(
|
|||||||
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
|
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
|
||||||
debug!("make_event_blobs_pipe {subq:?}");
|
debug!("make_event_blobs_pipe {subq:?}");
|
||||||
if subq.backend() == TEST_BACKEND {
|
if subq.backend() == TEST_BACKEND {
|
||||||
make_event_blobs_pipe_test(subq, node_config).await
|
make_event_blobs_pipe_test(subq, node_config)
|
||||||
} else {
|
} else {
|
||||||
make_event_blobs_pipe_real(subq, fetch_info, reqctx, node_config).await
|
make_event_blobs_pipe_real(subq, fetch_info, reqctx, node_config)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -747,7 +747,6 @@ impl DataApiPython3DataStream {
|
|||||||
TransformQuery::for_event_blobs(),
|
TransformQuery::for_event_blobs(),
|
||||||
);
|
);
|
||||||
let subq = EventsSubQuery::from_parts(select, self.settings.clone(), self.reqctx.reqid().into());
|
let subq = EventsSubQuery::from_parts(select, self.settings.clone(), self.reqctx.reqid().into());
|
||||||
let one_before = subq.transform().need_one_before_range();
|
|
||||||
debug!("query for event blobs retrieval subq {subq:?}");
|
debug!("query for event blobs retrieval subq {subq:?}");
|
||||||
// TODO important TODO
|
// TODO important TODO
|
||||||
debug!("TODO fix magic inmem_bufcap");
|
debug!("TODO fix magic inmem_bufcap");
|
||||||
@@ -755,18 +754,7 @@ impl DataApiPython3DataStream {
|
|||||||
// TODO is this a good to place decide this?
|
// TODO is this a good to place decide this?
|
||||||
let stream = if self.node_config.node_config.cluster.is_central_storage {
|
let stream = if self.node_config.node_config.cluster.is_central_storage {
|
||||||
debug!("set up central storage stream");
|
debug!("set up central storage stream");
|
||||||
// TODO pull up this config
|
disk::raw::conn::make_event_blobs_pipe(&subq, &fetch_info, self.reqctx.clone(), &self.node_config)?
|
||||||
let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024));
|
|
||||||
let s = make_event_blobs_stream(
|
|
||||||
self.range.clone(),
|
|
||||||
fetch_info.clone(),
|
|
||||||
one_before,
|
|
||||||
event_chunker_conf,
|
|
||||||
self.disk_io_tune.clone(),
|
|
||||||
self.reqctx.clone(),
|
|
||||||
&self.node_config,
|
|
||||||
)?;
|
|
||||||
Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>
|
|
||||||
} else {
|
} else {
|
||||||
debug!("set up merged remote stream {}", fetch_info.name());
|
debug!("set up merged remote stream {}", fetch_info.name());
|
||||||
let s = MergedBlobsFromRemotes::new(subq, self.node_config.node_config.cluster.clone());
|
let s = MergedBlobsFromRemotes::new(subq, self.node_config.node_config.cluster.clone());
|
||||||
|
|||||||
@@ -174,7 +174,7 @@ pub async fn create_response_bytes_stream(
|
|||||||
if evq.is_event_blobs() {
|
if evq.is_event_blobs() {
|
||||||
// TODO support event blobs as transform
|
// TODO support event blobs as transform
|
||||||
let fetch_info = evq.ch_conf().to_sf_databuffer()?;
|
let fetch_info = evq.ch_conf().to_sf_databuffer()?;
|
||||||
let stream = disk::raw::conn::make_event_blobs_pipe(&evq, &fetch_info, reqctx, ncc).await?;
|
let stream = disk::raw::conn::make_event_blobs_pipe(&evq, &fetch_info, reqctx, ncc)?;
|
||||||
// let stream = stream.map(|x| Box::new(x) as _);
|
// let stream = stream.map(|x| Box::new(x) as _);
|
||||||
let stream = stream.map(|x| x.make_frame().map(|x| x.freeze()));
|
let stream = stream.map(|x| x.make_frame().map(|x| x.freeze()));
|
||||||
let ret = Box::pin(stream);
|
let ret = Box::pin(stream);
|
||||||
|
|||||||
Reference in New Issue
Block a user