Handle writes

This commit is contained in:
Dominik Werder
2022-01-31 16:45:47 +01:00
parent 6c6f026b69
commit bcd3273dea
2 changed files with 29 additions and 22 deletions

View File

@@ -13,8 +13,8 @@ use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use hyper_tls::HttpsConnector;
use itertools::Itertools;
use netpod::log::*;
use netpod::query::BinnedQuery;
use netpod::{log::*, ACCEPT_ALL};
use netpod::{
AppendToUrl, ChannelConfigQuery, ChannelSearchQuery, ChannelSearchResult, ChannelSearchSingleResult, FromUrl,
HasBackend, HasTimeout, ProxyConfig, APP_JSON,
@@ -30,6 +30,8 @@ use tokio::fs::File;
use tokio::io::{AsyncRead, ReadBuf};
use url::Url;
const DISTRI_PRE: &str = "/distri/";
pub async fn proxy(proxy_config: ProxyConfig) -> Result<(), Error> {
use std::str::FromStr;
let addr = SocketAddr::from_str(&format!("{}:{}", proxy_config.listen, proxy_config.port))?;
@@ -63,7 +65,6 @@ async fn proxy_http_service(req: Request<Body>, proxy_config: ProxyConfig) -> Re
async fn proxy_http_service_try(req: Request<Body>, proxy_config: &ProxyConfig) -> Result<Response<Body>, Error> {
let uri = req.uri().clone();
let path = uri.path();
let distri_pre = "/distri/";
if path == "/api/1/channels" {
Ok(channel_search_list_v1(req, proxy_config).await?)
} else if path == "/api/1/channels/config" {
@@ -75,6 +76,7 @@ async fn proxy_http_service_try(req: Request<Body>, proxy_config: &ProxyConfig)
} else if path == "/api/1/query" {
Ok(proxy_api1_single_backend_query(req, proxy_config).await?)
} else if path.starts_with("/api/1/map/pulse/") {
warn!("/api/1/map/pulse/ DEPRECATED");
Ok(proxy_api1_map_pulse(req, proxy_config).await?)
} else if path.starts_with("/api/1/gather/") {
Ok(gather_json_2_v1(req, "/api/1/gather/", proxy_config).await?)
@@ -117,20 +119,8 @@ async fn proxy_http_service_try(req: Request<Body>, proxy_config: &ProxyConfig)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path.starts_with(distri_pre)
&& path
.chars()
.all(|c| c.is_ascii_alphanumeric() || ['/', '.', '-', '_'].contains(&c))
&& !path.contains("..")
{
if req.method() == Method::GET {
let s = FileStream {
file: File::open(format!("/opt/distri/{}", &path[distri_pre.len()..])).await?,
};
Ok(response(StatusCode::OK).body(Body::wrap_stream(s))?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path.starts_with(DISTRI_PRE) {
proxy_distribute_v2(req).await
} else {
Ok(response(StatusCode::NOT_FOUND).body(Body::from(format!(
"Sorry, proxy can not find: {:?} {:?} {:?}",
@@ -141,6 +131,23 @@ async fn proxy_http_service_try(req: Request<Body>, proxy_config: &ProxyConfig)
}
}
pub async fn proxy_distribute_v2(req: Request<Body>) -> Result<Response<Body>, Error> {
let path = req.uri().path();
if path
.chars()
.all(|c| c.is_ascii_alphanumeric() || ['/', '.', '-', '_'].contains(&c))
&& !path.contains("..")
{}
if req.method() == Method::GET {
let s = FileStream {
file: File::open(format!("/opt/distri/{}", &path[DISTRI_PRE.len()..])).await?,
};
Ok(response(StatusCode::OK).body(Body::wrap_stream(s))?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
}
pub struct FileStream {
file: File,
}
@@ -399,7 +406,7 @@ where
let (head, _body) = req.into_parts();
match head.headers.get(http::header::ACCEPT) {
Some(v) => {
if v == APP_JSON {
if v == APP_JSON || v == ACCEPT_ALL {
let url = Url::parse(&format!("dummy:{}", head.uri))?;
let query = QT::from_url(&url)?;
let sh = if url.as_str().contains("/map/pulse/") {