From e258718d0251d8c1330a3cc25bddc76fe827a7ed Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 2 Aug 2021 16:28:38 +0200 Subject: [PATCH] Support distribution --- archapp/src/events.rs | 2 -- httpret/src/proxy.rs | 45 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/archapp/src/events.rs b/archapp/src/events.rs index dd0526e..5a5e4d8 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -422,8 +422,6 @@ pub async fn make_single_event_pipe( let mut pbr = PbFileReader::new(f1).await; pbr.read_header().await?; info!("✓ read header {:?}", pbr.payload_type()); - - // TODO this is ugly: pbr.file().seek(SeekFrom::Start(pos1)).await?; pbr.reset_io(pos1); diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index 43b40e5..237650a 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -4,6 +4,8 @@ use crate::{api_4_docs, response, Cont}; use disk::binned::query::BinnedQuery; use disk::events::PlainEventsJsonQuery; use err::Error; +use futures_core::Stream; +use futures_util::pin_mut; use http::{Method, StatusCode}; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, Server}; @@ -18,7 +20,10 @@ use serde_json::Value as JsonValue; use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::Duration; +use tokio::fs::File; +use tokio::io::{AsyncRead, ReadBuf}; use url::Url; pub async fn proxy(proxy_config: ProxyConfig) -> Result<(), Error> { @@ -88,6 +93,15 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } + } else if path.starts_with("/distri/daqbuffer") { + if req.method() == Method::GET { + let s = FileStream { + file: File::open("/opt/distri/daqbuffer").await?, + }; + Ok(response(StatusCode::OK).body(Body::wrap_stream(s))?) + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } } else { Ok(response(StatusCode::NOT_FOUND).body(Body::from(format!( "Sorry, not found: {:?} {:?} {:?}", @@ -98,6 +112,37 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) } } +pub struct FileStream { + file: File, +} + +impl Stream for FileStream { + type Item = Result, Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + let mut buf = vec![0; 1024 * 8]; + let mut rb = ReadBuf::new(&mut buf); + let f = &mut self.file; + pin_mut!(f); + match f.poll_read(cx, &mut rb) { + Ready(k) => match k { + Ok(_) => { + let n = rb.filled().len(); + if n == 0 { + Ready(None) + } else { + buf.truncate(n); + Ready(Some(Ok(buf))) + } + } + Err(e) => Ready(Some(Err(e.into()))), + }, + Pending => Pending, + } + } +} + #[derive(Serialize, Deserialize)] pub struct BackendsResponse { backends: Vec,