diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index ffd4b93..55cde27 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -11,11 +11,11 @@ use std::future::ready; use std::pin::Pin; use std::task::{Context, Poll}; -pub struct BinnedStream { +pub struct BinnedStreamFromPreBinnedPatches { inp: Pin> + Send>>, } -impl BinnedStream { +impl BinnedStreamFromPreBinnedPatches { pub fn new( patch_it: PreBinnedPatchIterator, channel: Channel, @@ -87,7 +87,7 @@ impl BinnedStream { } } -impl Stream for BinnedStream { +impl Stream for BinnedStreamFromPreBinnedPatches { // TODO make this generic over all possible things type Item = Result; diff --git a/disk/src/cache.rs b/disk/src/cache.rs index f6cfa03..8dc7914 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,7 +1,7 @@ use crate::agg::binnedt::IntoBinnedT; use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem; use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem}; -use crate::binnedstream::{BinnedStream, BinnedStreamFromMerged}; +use crate::binnedstream::{BinnedStreamFromMerged, BinnedStreamFromPreBinnedPatches}; use crate::cache::pbv::PreBinnedValueByteStream; use crate::cache::pbvfs::PreBinnedItem; use crate::channelconfig::{extract_matching_config_entry, read_local_config}; @@ -226,7 +226,7 @@ pub async fn binned_bytes_for_http( ); return Err(Error::with_msg(msg)); } - let s1 = BinnedStream::new( + let s1 = BinnedStreamFromPreBinnedPatches::new( PreBinnedPatchIterator::from_range(pre_range), query.channel.clone(), range, @@ -258,7 +258,7 @@ pub async fn binned_bytes_for_http( } } -pub type BinnedBytesForHttpStreamFrame = ::Item; +pub type BinnedBytesForHttpStreamFrame = ::Item; pub struct BinnedBytesForHttpStream { inp: S, diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 007a3f2..fda1792 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -1,5 +1,5 @@ use bytes::Bytes; -use disk::cache::PreBinnedQuery; +use disk::cache::{BinnedQuery, PreBinnedQuery}; use disk::eventchunker::EventChunkerConf; use disk::raw::conn::raw_service; use err::Error; @@ -200,6 +200,26 @@ where async fn binned(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let (head, _body) = req.into_parts(); let query = disk::cache::BinnedQuery::from_request(&head)?; + match head.headers.get("accept") { + Some(v) if v == "application/octet-stream" => binned_binary(query, node_config).await, + _ => binned_json(query, node_config).await, + } +} + +async fn binned_binary(query: BinnedQuery, node_config: &NodeConfigCached) -> Result, Error> { + info!("binned_binary"); + let ret = match disk::cache::binned_bytes_for_http(node_config, &query).await { + Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("desc-BINNED")))?, + Err(e) => { + error!("fn binned: {:?}", e); + response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? + } + }; + Ok(ret) +} + +async fn binned_json(query: BinnedQuery, node_config: &NodeConfigCached) -> Result, Error> { + info!("binned_json"); let ret = match disk::cache::binned_bytes_for_http(node_config, &query).await { Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("desc-BINNED")))?, Err(e) => {