Support distribution

This commit is contained in:
Dominik Werder
2021-08-02 16:28:38 +02:00
parent 5e624bb2ca
commit e258718d02
2 changed files with 45 additions and 2 deletions

View File

@@ -422,8 +422,6 @@ pub async fn make_single_event_pipe(
let mut pbr = PbFileReader::new(f1).await;
pbr.read_header().await?;
info!("✓ read header {:?}", pbr.payload_type());
// TODO this is ugly:
pbr.file().seek(SeekFrom::Start(pos1)).await?;
pbr.reset_io(pos1);

View File

@@ -4,6 +4,8 @@ use crate::{api_4_docs, response, Cont};
use disk::binned::query::BinnedQuery;
use disk::events::PlainEventsJsonQuery;
use err::Error;
use futures_core::Stream;
use futures_util::pin_mut;
use http::{Method, StatusCode};
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
@@ -18,7 +20,10 @@ use serde_json::Value as JsonValue;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::fs::File;
use tokio::io::{AsyncRead, ReadBuf};
use url::Url;
pub async fn proxy(proxy_config: ProxyConfig) -> Result<(), Error> {
@@ -88,6 +93,15 @@ async fn proxy_http_service_try(req: Request<Body>, proxy_config: &ProxyConfig)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path.starts_with("/distri/daqbuffer") {
if req.method() == Method::GET {
let s = FileStream {
file: File::open("/opt/distri/daqbuffer").await?,
};
Ok(response(StatusCode::OK).body(Body::wrap_stream(s))?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else {
Ok(response(StatusCode::NOT_FOUND).body(Body::from(format!(
"Sorry, not found: {:?} {:?} {:?}",
@@ -98,6 +112,37 @@ async fn proxy_http_service_try(req: Request<Body>, proxy_config: &ProxyConfig)
}
}
pub struct FileStream {
file: File,
}
impl Stream for FileStream {
type Item = Result<Vec<u8>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let mut buf = vec![0; 1024 * 8];
let mut rb = ReadBuf::new(&mut buf);
let f = &mut self.file;
pin_mut!(f);
match f.poll_read(cx, &mut rb) {
Ready(k) => match k {
Ok(_) => {
let n = rb.filled().len();
if n == 0 {
Ready(None)
} else {
buf.truncate(n);
Ready(Some(Ok(buf)))
}
}
Err(e) => Ready(Some(Err(e.into()))),
},
Pending => Pending,
}
}
}
#[derive(Serialize, Deserialize)]
pub struct BackendsResponse {
backends: Vec<String>,