From bcd3273dea2ad15029b04721828efe755714ccaf Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 31 Jan 2022 16:45:47 +0100 Subject: [PATCH] Handle writes --- h5out/src/lib.rs | 10 +++++----- httpret/src/proxy.rs | 41 ++++++++++++++++++++++++----------------- 2 files changed, 29 insertions(+), 22 deletions(-) diff --git a/h5out/src/lib.rs b/h5out/src/lib.rs index 8f3da83..b3d0c19 100644 --- a/h5out/src/lib.rs +++ b/h5out/src/lib.rs @@ -81,7 +81,7 @@ fn write_file(out: &Out) -> Result<(), Error> { .truncate(true) .create(true) .open("f.h5")?; - f.write(out.cur.get_ref())?; + f.write_all(out.cur.get_ref())?; Ok(()) } @@ -109,7 +109,7 @@ fn write_superblock(out: &mut Out) -> Result<(), Error> { let free_index_addr = u64::MAX; let eof = 4242; write_padding(out)?; - out.write(b"\x89HDF\r\n\x1a\n")?; + out.write_all(b"\x89HDF\r\n\x1a\n")?; out.write_u8(super_ver)?; out.write_u8(free_ver)?; out.write_u8(root_group_ver)?; @@ -183,7 +183,7 @@ fn write_local_heap(out: &mut Out) -> Result<(), Error> { let seg_size = 1024; let free_list_off = u64::MAX; let seg_addr = pos0 + 32; - out.write(b"HEAP")?; + out.write_all(b"HEAP")?; out.write_u8(ver)?; out.write_u8(0)?; out.write_u8(0)?; @@ -191,11 +191,11 @@ fn write_local_heap(out: &mut Out) -> Result<(), Error> { out.write_u64(seg_size)?; out.write_u64(free_list_off)?; out.write_u64(seg_addr)?; - out.write(&[0; 1024])?; + out.write_all(&[0; 1024])?; { let h = out.cur.position(); out.cur.set_position(h - 1024); - out.write(b"somename")?; + out.write_all(b"somename")?; out.cur.set_position(h); } Ok(()) diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index 2d29d61..2eb0411 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -13,8 +13,8 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, Server}; use hyper_tls::HttpsConnector; use itertools::Itertools; -use netpod::log::*; use netpod::query::BinnedQuery; +use netpod::{log::*, ACCEPT_ALL}; use netpod::{ AppendToUrl, ChannelConfigQuery, ChannelSearchQuery, ChannelSearchResult, ChannelSearchSingleResult, FromUrl, HasBackend, HasTimeout, ProxyConfig, APP_JSON, @@ -30,6 +30,8 @@ use tokio::fs::File; use tokio::io::{AsyncRead, ReadBuf}; use url::Url; +const DISTRI_PRE: &str = "/distri/"; + pub async fn proxy(proxy_config: ProxyConfig) -> Result<(), Error> { use std::str::FromStr; let addr = SocketAddr::from_str(&format!("{}:{}", proxy_config.listen, proxy_config.port))?; @@ -63,7 +65,6 @@ async fn proxy_http_service(req: Request, proxy_config: ProxyConfig) -> Re async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) -> Result, Error> { let uri = req.uri().clone(); let path = uri.path(); - let distri_pre = "/distri/"; if path == "/api/1/channels" { Ok(channel_search_list_v1(req, proxy_config).await?) } else if path == "/api/1/channels/config" { @@ -75,6 +76,7 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) } else if path == "/api/1/query" { Ok(proxy_api1_single_backend_query(req, proxy_config).await?) } else if path.starts_with("/api/1/map/pulse/") { + warn!("/api/1/map/pulse/ DEPRECATED"); Ok(proxy_api1_map_pulse(req, proxy_config).await?) } else if path.starts_with("/api/1/gather/") { Ok(gather_json_2_v1(req, "/api/1/gather/", proxy_config).await?) @@ -117,20 +119,8 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } - } else if path.starts_with(distri_pre) - && path - .chars() - .all(|c| c.is_ascii_alphanumeric() || ['/', '.', '-', '_'].contains(&c)) - && !path.contains("..") - { - if req.method() == Method::GET { - let s = FileStream { - file: File::open(format!("/opt/distri/{}", &path[distri_pre.len()..])).await?, - }; - Ok(response(StatusCode::OK).body(Body::wrap_stream(s))?) - } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) - } + } else if path.starts_with(DISTRI_PRE) { + proxy_distribute_v2(req).await } else { Ok(response(StatusCode::NOT_FOUND).body(Body::from(format!( "Sorry, proxy can not find: {:?} {:?} {:?}", @@ -141,6 +131,23 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) } } +pub async fn proxy_distribute_v2(req: Request) -> Result, Error> { + let path = req.uri().path(); + if path + .chars() + .all(|c| c.is_ascii_alphanumeric() || ['/', '.', '-', '_'].contains(&c)) + && !path.contains("..") + {} + if req.method() == Method::GET { + let s = FileStream { + file: File::open(format!("/opt/distri/{}", &path[DISTRI_PRE.len()..])).await?, + }; + Ok(response(StatusCode::OK).body(Body::wrap_stream(s))?) + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } +} + pub struct FileStream { file: File, } @@ -399,7 +406,7 @@ where let (head, _body) = req.into_parts(); match head.headers.get(http::header::ACCEPT) { Some(v) => { - if v == APP_JSON { + if v == APP_JSON || v == ACCEPT_ALL { let url = Url::parse(&format!("dummy:{}", head.uri))?; let query = QT::from_url(&url)?; let sh = if url.as_str().contains("/map/pulse/") {