WIP
This commit is contained in:
@@ -11,11 +11,11 @@ use std::future::ready;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
pub struct BinnedStream {
|
||||
pub struct BinnedStreamFromPreBinnedPatches {
|
||||
inp: Pin<Box<dyn Stream<Item = Result<MinMaxAvgScalarBinBatchStreamItem, Error>> + Send>>,
|
||||
}
|
||||
|
||||
impl BinnedStream {
|
||||
impl BinnedStreamFromPreBinnedPatches {
|
||||
pub fn new(
|
||||
patch_it: PreBinnedPatchIterator,
|
||||
channel: Channel,
|
||||
@@ -87,7 +87,7 @@ impl BinnedStream {
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for BinnedStream {
|
||||
impl Stream for BinnedStreamFromPreBinnedPatches {
|
||||
// TODO make this generic over all possible things
|
||||
type Item = Result<MinMaxAvgScalarBinBatchStreamItem, Error>;
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::agg::binnedt::IntoBinnedT;
|
||||
use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem;
|
||||
use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem};
|
||||
use crate::binnedstream::{BinnedStream, BinnedStreamFromMerged};
|
||||
use crate::binnedstream::{BinnedStreamFromMerged, BinnedStreamFromPreBinnedPatches};
|
||||
use crate::cache::pbv::PreBinnedValueByteStream;
|
||||
use crate::cache::pbvfs::PreBinnedItem;
|
||||
use crate::channelconfig::{extract_matching_config_entry, read_local_config};
|
||||
@@ -226,7 +226,7 @@ pub async fn binned_bytes_for_http(
|
||||
);
|
||||
return Err(Error::with_msg(msg));
|
||||
}
|
||||
let s1 = BinnedStream::new(
|
||||
let s1 = BinnedStreamFromPreBinnedPatches::new(
|
||||
PreBinnedPatchIterator::from_range(pre_range),
|
||||
query.channel.clone(),
|
||||
range,
|
||||
@@ -258,7 +258,7 @@ pub async fn binned_bytes_for_http(
|
||||
}
|
||||
}
|
||||
|
||||
pub type BinnedBytesForHttpStreamFrame = <BinnedStream as Stream>::Item;
|
||||
pub type BinnedBytesForHttpStreamFrame = <BinnedStreamFromPreBinnedPatches as Stream>::Item;
|
||||
|
||||
pub struct BinnedBytesForHttpStream<S> {
|
||||
inp: S,
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use bytes::Bytes;
|
||||
use disk::cache::PreBinnedQuery;
|
||||
use disk::cache::{BinnedQuery, PreBinnedQuery};
|
||||
use disk::eventchunker::EventChunkerConf;
|
||||
use disk::raw::conn::raw_service;
|
||||
use err::Error;
|
||||
@@ -200,6 +200,26 @@ where
|
||||
async fn binned(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
let (head, _body) = req.into_parts();
|
||||
let query = disk::cache::BinnedQuery::from_request(&head)?;
|
||||
match head.headers.get("accept") {
|
||||
Some(v) if v == "application/octet-stream" => binned_binary(query, node_config).await,
|
||||
_ => binned_json(query, node_config).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn binned_binary(query: BinnedQuery, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
info!("binned_binary");
|
||||
let ret = match disk::cache::binned_bytes_for_http(node_config, &query).await {
|
||||
Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("desc-BINNED")))?,
|
||||
Err(e) => {
|
||||
error!("fn binned: {:?}", e);
|
||||
response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?
|
||||
}
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
async fn binned_json(query: BinnedQuery, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
info!("binned_json");
|
||||
let ret = match disk::cache::binned_bytes_for_http(node_config, &query).await {
|
||||
Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("desc-BINNED")))?,
|
||||
Err(e) => {
|
||||
|
||||
Reference in New Issue
Block a user