This commit is contained in:
Dominik Werder
2021-05-18 22:06:47 +02:00
parent 19ff08a4bd
commit d4d76d97da
4 changed files with 31 additions and 21 deletions
+10 -3
View File
@@ -17,12 +17,15 @@ use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
pub struct BinnedStreamRes { pub struct BinnedStreamRes<I> {
pub binned_stream: BinnedStream, pub binned_stream: BinnedStream<I>,
pub range: BinnedRange, pub range: BinnedRange,
} }
pub async fn binned_stream(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result<BinnedStreamRes, Error> { pub async fn binned_stream(
node_config: &NodeConfigCached,
query: &BinnedQuery,
) -> Result<BinnedStreamRes<Result<MinMaxAvgScalarBinBatchStreamItem, Error>>, Error> {
if query.channel().backend != node_config.node.backend { if query.channel().backend != node_config.node.backend {
let err = Error::with_msg(format!( let err = Error::with_msg(format!(
"backend mismatch node: {} requested: {}", "backend mismatch node: {} requested: {}",
@@ -96,6 +99,10 @@ pub async fn binned_bytes_for_http(
node_config: &NodeConfigCached, node_config: &NodeConfigCached,
query: &BinnedQuery, query: &BinnedQuery,
) -> Result<BinnedStreamBox, Error> { ) -> Result<BinnedStreamBox, Error> {
// TODO must decide here already which AggKind so that I can call into the generic code.
todo::todo;
let res = binned_stream(node_config, query).await?; let res = binned_stream(node_config, query).await?;
let ret = BinnedBytesForHttpStream::new(res.binned_stream); let ret = BinnedBytesForHttpStream::new(res.binned_stream);
Ok(Box::pin(ret)) Ok(Box::pin(ret))
+8 -9
View File
@@ -95,21 +95,20 @@ impl Stream for BinnedStreamFromPreBinnedPatches {
} }
} }
pub struct BinnedStream { pub struct BinnedStream<I> {
inp: Pin<Box<dyn Stream<Item = Result<MinMaxAvgScalarBinBatchStreamItem, Error>> + Send>>, inp: Pin<Box<dyn Stream<Item = I> + Send>>,
} }
impl BinnedStream { impl<I> BinnedStream<I> {
pub fn new( // Item was: Result<MinMaxAvgScalarBinBatchStreamItem, Error>
inp: Pin<Box<dyn Stream<Item = Result<MinMaxAvgScalarBinBatchStreamItem, Error>> + Send>>, pub fn new(inp: Pin<Box<dyn Stream<Item = I> + Send>>) -> Result<Self, Error> {
) -> Result<Self, Error> {
Ok(Self { inp }) Ok(Self { inp })
} }
} }
impl Stream for BinnedStream { impl<I> Stream for BinnedStream<I> {
// TODO make this generic over all possible things //type Item = Result<MinMaxAvgScalarBinBatchStreamItem, Error>;
type Item = Result<MinMaxAvgScalarBinBatchStreamItem, Error>; type Item = I;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.inp.poll_next_unpin(cx) self.inp.poll_next_unpin(cx)
+12 -8
View File
@@ -85,16 +85,20 @@ where
impl<F> UnwindSafe for Cont<F> {} impl<F> UnwindSafe for Cont<F> {}
macro_rules! static_http { macro_rules! static_http {
($path:expr, $tgt:expr, $tgtex:expr) => { ($path:expr, $tgt:expr, $tgtex:expr, $ctype:expr) => {
if $path == concat!("/api/4/documentation/", $tgt) { if $path == concat!("/api/4/documentation/", $tgt) {
let c = include_bytes!(concat!("../static/documentation/", $tgtex)); let c = include_bytes!(concat!("../static/documentation/", $tgtex));
return Ok(response(StatusCode::OK).body(Body::from(&c[..]))?); return Ok(response(StatusCode::OK)
.header("content-type", $ctype)
.body(Body::from(&c[..]))?);
} }
}; };
($path:expr, $tgt:expr) => { ($path:expr, $tgt:expr, $ctype:expr) => {
if $path == concat!("/api/4/documentation/", $tgt) { if $path == concat!("/api/4/documentation/", $tgt) {
let c = include_bytes!(concat!("../static/documentation/", $tgt)); let c = include_bytes!(concat!("../static/documentation/", $tgt));
return Ok(response(StatusCode::OK).body(Body::from(&c[..]))?); return Ok(response(StatusCode::OK)
.header("content-type", $ctype)
.body(Body::from(&c[..]))?);
} }
}; };
} }
@@ -140,10 +144,10 @@ async fn data_api_proxy_try(req: Request<Body>, node_config: &NodeConfigCached)
} }
} else if path.starts_with("/api/4/documentation/") { } else if path.starts_with("/api/4/documentation/") {
if req.method() == Method::GET { if req.method() == Method::GET {
static_http!(path, "", "index.html"); static_http!(path, "", "index.html", "text/html");
static_http!(path, "style.css"); static_http!(path, "style.css", "text/stylesheet");
static_http!(path, "script.js"); static_http!(path, "script.js", "text/javascript");
static_http!(path, "status-main.html"); static_http!(path, "status-main.html", "text/html");
Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?) Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?)
} else { } else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
@@ -5,7 +5,7 @@
<title>Main Status</title> <title>Main Status</title>
<link rel="shortcut icon" href="about:blank"/> <link rel="shortcut icon" href="about:blank"/>
<link rel="stylesheet" href="style.css"/> <link rel="stylesheet" href="style.css"/>
<script src="script.js"></script> <script src="script.js" type="text/javascript"></script>
</head> </head>
<body> <body>
<h1>daqbuffer - Main Status</h1> <h1>daqbuffer - Main Status</h1>