WIP
This commit is contained in:
@@ -6,7 +6,7 @@ use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use futures_core::Stream;
|
||||
use futures_util::{StreamExt, FutureExt, pin_mut};
|
||||
use bytes::Bytes;
|
||||
use bytes::{Bytes, BytesMut, BufMut};
|
||||
use chrono::{DateTime, Utc};
|
||||
use netpod::{Node, Cluster, AggKind, NanoRange, ToNanos, PreBinnedPatchGridSpec, PreBinnedPatchIterator, PreBinnedPatchCoord, Channel};
|
||||
use crate::agg::MinMaxAvgScalarBinBatch;
|
||||
@@ -53,6 +53,7 @@ pub struct BinParams {
|
||||
}
|
||||
|
||||
pub fn binned_bytes_for_http(params: BinParams, query: &Query) -> Result<BinnedBytesForHttpStream, Error> {
|
||||
let agg_kind = AggKind::DimXBins1;
|
||||
|
||||
// TODO
|
||||
// Translate the Query TimeRange + AggKind into an iterator over the pre-binned patches.
|
||||
@@ -61,7 +62,7 @@ pub fn binned_bytes_for_http(params: BinParams, query: &Query) -> Result<BinnedB
|
||||
Some(spec) => {
|
||||
info!("GOT PreBinnedPatchGridSpec: {:?}", spec);
|
||||
warn!("Pass here to BinnedStream what kind of Agg, range, ...");
|
||||
let s1 = BinnedStream::new(PreBinnedPatchIterator::from_range(spec), query.channel.clone(), params.cluster.clone());
|
||||
let s1 = BinnedStream::new(PreBinnedPatchIterator::from_range(spec), query.channel.clone(), agg_kind, params.cluster.clone());
|
||||
// Iterate over the patches.
|
||||
// Request the patch from each node.
|
||||
// Merge.
|
||||
@@ -92,10 +93,11 @@ impl Stream for BinnedBytesForHttpStream {
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
error!("TODO translate the structured stream into plain bytes for http");
|
||||
match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(k))) => {
|
||||
Ready(Some(Ok(Bytes::new())))
|
||||
let mut buf = BytesMut::with_capacity(250);
|
||||
buf.put(&b"TODO serialize to bytes\n"[..]);
|
||||
Ready(Some(Ok(buf.freeze())))
|
||||
}
|
||||
Ready(Some(Err(e))) => Ready(Some(Err(e))),
|
||||
Ready(None) => Ready(None),
|
||||
@@ -115,17 +117,17 @@ pub struct PreBinnedValueStream {
|
||||
|
||||
impl PreBinnedValueStream {
|
||||
|
||||
pub fn new(patch_coord: PreBinnedPatchCoord, channel: Channel, cluster: Cluster) -> Self {
|
||||
pub fn new(patch_coord: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, cluster: Cluster) -> Self {
|
||||
let nodeix = node_ix_for_patch(&patch_coord, &channel, &cluster);
|
||||
warn!("TODO PASS THE KIND OF AGG");
|
||||
let node = &cluster.nodes[nodeix];
|
||||
let uri: hyper::Uri = format!(
|
||||
"http://{}:{}/api/1/prebinned?beg={}&end={}&channel={}",
|
||||
"http://{}:{}/api/1/prebinned?beg={}&end={}&channel={}&agg_kind={:?}",
|
||||
node.host,
|
||||
node.port,
|
||||
patch_coord.range.beg,
|
||||
patch_coord.range.end,
|
||||
channel.name,
|
||||
agg_kind,
|
||||
).parse().unwrap();
|
||||
Self {
|
||||
uri,
|
||||
@@ -212,11 +214,11 @@ pub struct BinnedStream {
|
||||
|
||||
impl BinnedStream {
|
||||
|
||||
pub fn new(patch_it: PreBinnedPatchIterator, channel: Channel, cluster: Cluster) -> Self {
|
||||
pub fn new(patch_it: PreBinnedPatchIterator, channel: Channel, agg_kind: AggKind, cluster: Cluster) -> Self {
|
||||
let mut patch_it = patch_it;
|
||||
let inp = futures_util::stream::iter(patch_it)
|
||||
.map(move |coord| {
|
||||
PreBinnedValueStream::new(coord, channel.clone(), cluster.clone())
|
||||
PreBinnedValueStream::new(coord, channel.clone(), agg_kind.clone(), cluster.clone())
|
||||
})
|
||||
.flatten();
|
||||
Self {
|
||||
|
||||
@@ -160,6 +160,28 @@ async fn binned(req: Request<Body>, hconf: HostConf) -> Result<Response<Body>, E
|
||||
}
|
||||
|
||||
|
||||
async fn prebinned(req: Request<Body>, hconf: HostConf) -> Result<Response<Body>, Error> {
|
||||
let (head, body) = req.into_parts();
|
||||
todo!("create a new PreBinnedQuery and let extract from query");
|
||||
let params = BinParams {
|
||||
node: hconf.node.clone(),
|
||||
cluster: hconf.cluster.clone(),
|
||||
};
|
||||
todo!("create this new entry point in disk::cache");
|
||||
let ret = match Ok(()) {
|
||||
Ok(s) => {
|
||||
response(StatusCode::OK)
|
||||
.body(Body::wrap_stream(______))?
|
||||
}
|
||||
Err(e) => {
|
||||
error!("{:?}", e);
|
||||
response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?
|
||||
}
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct HostConf {
|
||||
|
||||
Reference in New Issue
Block a user