diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index c06d89f..3e259d3 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqbuffer" -version = "0.5.5-aa.4" +version = "0.5.5-aa.5" authors = ["Dominik Werder "] edition = "2021" diff --git a/crates/httpret/src/gather.rs b/crates/httpret/src/gather.rs index 99e38e3..aac7a46 100644 --- a/crates/httpret/src/gather.rs +++ b/crates/httpret/src/gather.rs @@ -1,8 +1,6 @@ use crate::body_empty; use crate::body_string; use crate::err::Error; -use futures_util::select; -use futures_util::FutureExt; use http::header; use http::Method; use http::StatusCode; @@ -22,7 +20,6 @@ use std::future::Future; use std::pin::Pin; use std::time::Duration; use taskrun::tokio; -use tokio::time::sleep; use url::Url; #[derive(Clone, Serialize, Deserialize)] @@ -69,8 +66,6 @@ where FT: Fn(Vec<(Tag, Result, Error>)>) -> Result, SubRes: fmt::Debug, { - // TODO remove magic constant - let extra_timeout = Duration::from_millis(3000); if urls.len() != bodies.len() { return Err(Error::with_msg_no_trace(format!("unequal numbers of urls and bodies"))); } @@ -99,7 +94,7 @@ where .header(http::header::CONTENT_TYPE, APP_JSON) .header(http::header::ACCEPT, APP_JSON) .header(ctx.header_name(), ctx.header_value()) - .uri(uri) + .uri(uri.clone()) } else { Request::builder() .method(method.clone()) @@ -107,7 +102,7 @@ where .header(http::header::CONTENT_TYPE, APP_JSON) .header(http::header::ACCEPT, APP_JSON) .header(ctx.header_name(), ctx.header_value()) - .uri(uri) + .uri(uri.clone()) }; let body = match body { None => body_empty(), @@ -116,28 +111,28 @@ where match req.body(body) { Ok(req) => { let tag2 = tag.clone(); + let fut3 = async move { + let mut client = match connect_client(req.uri()).await { + Ok(x) => x, + Err(e) => { + warn!("can not connect {} {}", uri, e); + return Err(Error::from_to_string(e)); + } + }; + let res = match client.send_request(req).await { + Ok(x) => nt(tag2, x).await?, + Err(e) => { + warn!("can not send request {} {}", uri, e); + return Err(Error::from_to_string(e)); + } + }; + Ok(res) + }; let jh = tokio::spawn(async move { - select! { - _ = sleep(timeout + extra_timeout).fuse() => { - error!("PROXY TIMEOUT"); - Err(Error::with_msg_no_trace(format!("timeout"))) - } - res = async move { - let mut client = match connect_client(req.uri()).await { - Ok(x) => x, - Err(e) => return Err(Error::from_to_string(e)), - }; - let res = match client.send_request(req).await { - Ok(x) => x, - Err(e) => return Err(Error::from_to_string(e)), - }; - Ok(res) - }.fuse() => { - debug!("received result in time {res:?}"); - let ret = nt(tag2, res?).await?; - debug!("transformed result in time {ret:?}"); - Ok(ret) - } + let fut4 = taskrun::tokio::time::timeout(timeout, fut3); + match fut4.await { + Ok(x) => x, + Err(_) => Err(Error::with_msg_no_trace(format!("timeout"))), } }); Some((url, tag, jh)) @@ -150,17 +145,17 @@ where }) .collect(); let mut a = Vec::new(); - for (_url, tag, jh) in spawned { + for (url, tag, jh) in spawned { let res = match jh.await { Ok(k) => match k { Ok(k) => (Tag(tag), Ok(k)), Err(e) => { - warn!("{e:?}"); + warn!("{} {}", url, e); (Tag(tag), Err(e)) } }, Err(e) => { - warn!("{e:?}"); + warn!("{} {}", url, e); (Tag(tag), Err(e.into())) } }; diff --git a/crates/httpret/src/proxy/api4.rs b/crates/httpret/src/proxy/api4.rs index 4c0a320..f4612db 100644 --- a/crates/httpret/src/proxy/api4.rs +++ b/crates/httpret/src/proxy/api4.rs @@ -50,11 +50,12 @@ pub async fn channel_search(req: Requ, ctx: &ReqCtx, proxy_config: &ProxyConfig) let mut tags = Vec::new(); let mut bodies = Vec::new(); for pb in &proxy_config.backends { - if if let Some(b) = &query.backend { + let use_backend = if let Some(b) = &query.backend { pb.name.contains(b) } else { true - } { + }; + if use_backend { match Url::parse(&format!("{}/api/4/search/channel", pb.url)) { Ok(mut url) => { query.append_to_url(&mut url); @@ -69,7 +70,8 @@ pub async fn channel_search(req: Requ, ctx: &ReqCtx, proxy_config: &ProxyConfig) let nt = |tag: String, res: Response| { let fut = async { let (_head, body) = res.into_parts(); - let body = read_body_bytes(body).await?; + let fut = read_body_bytes(body); + let body = fut.await?; //info!("got a result {:?}", body); let res: ChannelSearchResult = match serde_json::from_slice(&body) { Ok(k) => k, @@ -113,7 +115,7 @@ pub async fn channel_search(req: Requ, ctx: &ReqCtx, proxy_config: &ProxyConfig) tags, nt, ft, - Duration::from_millis(3000), + Duration::from_millis(1100), ctx, ) .await?;