Improve search proxy handling
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "daqbuffer"
|
||||
version = "0.5.5-aa.4"
|
||||
version = "0.5.5-aa.5"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2021"
|
||||
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
use crate::body_empty;
|
||||
use crate::body_string;
|
||||
use crate::err::Error;
|
||||
use futures_util::select;
|
||||
use futures_util::FutureExt;
|
||||
use http::header;
|
||||
use http::Method;
|
||||
use http::StatusCode;
|
||||
@@ -22,7 +20,6 @@ use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::time::Duration;
|
||||
use taskrun::tokio;
|
||||
use tokio::time::sleep;
|
||||
use url::Url;
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
@@ -69,8 +66,6 @@ where
|
||||
FT: Fn(Vec<(Tag, Result<SubRes<SM>, Error>)>) -> Result<OUT, Error>,
|
||||
SubRes<SM>: fmt::Debug,
|
||||
{
|
||||
// TODO remove magic constant
|
||||
let extra_timeout = Duration::from_millis(3000);
|
||||
if urls.len() != bodies.len() {
|
||||
return Err(Error::with_msg_no_trace(format!("unequal numbers of urls and bodies")));
|
||||
}
|
||||
@@ -99,7 +94,7 @@ where
|
||||
.header(http::header::CONTENT_TYPE, APP_JSON)
|
||||
.header(http::header::ACCEPT, APP_JSON)
|
||||
.header(ctx.header_name(), ctx.header_value())
|
||||
.uri(uri)
|
||||
.uri(uri.clone())
|
||||
} else {
|
||||
Request::builder()
|
||||
.method(method.clone())
|
||||
@@ -107,7 +102,7 @@ where
|
||||
.header(http::header::CONTENT_TYPE, APP_JSON)
|
||||
.header(http::header::ACCEPT, APP_JSON)
|
||||
.header(ctx.header_name(), ctx.header_value())
|
||||
.uri(uri)
|
||||
.uri(uri.clone())
|
||||
};
|
||||
let body = match body {
|
||||
None => body_empty(),
|
||||
@@ -116,28 +111,28 @@ where
|
||||
match req.body(body) {
|
||||
Ok(req) => {
|
||||
let tag2 = tag.clone();
|
||||
let fut3 = async move {
|
||||
let mut client = match connect_client(req.uri()).await {
|
||||
Ok(x) => x,
|
||||
Err(e) => {
|
||||
warn!("can not connect {} {}", uri, e);
|
||||
return Err(Error::from_to_string(e));
|
||||
}
|
||||
};
|
||||
let res = match client.send_request(req).await {
|
||||
Ok(x) => nt(tag2, x).await?,
|
||||
Err(e) => {
|
||||
warn!("can not send request {} {}", uri, e);
|
||||
return Err(Error::from_to_string(e));
|
||||
}
|
||||
};
|
||||
Ok(res)
|
||||
};
|
||||
let jh = tokio::spawn(async move {
|
||||
select! {
|
||||
_ = sleep(timeout + extra_timeout).fuse() => {
|
||||
error!("PROXY TIMEOUT");
|
||||
Err(Error::with_msg_no_trace(format!("timeout")))
|
||||
}
|
||||
res = async move {
|
||||
let mut client = match connect_client(req.uri()).await {
|
||||
Ok(x) => x,
|
||||
Err(e) => return Err(Error::from_to_string(e)),
|
||||
};
|
||||
let res = match client.send_request(req).await {
|
||||
Ok(x) => x,
|
||||
Err(e) => return Err(Error::from_to_string(e)),
|
||||
};
|
||||
Ok(res)
|
||||
}.fuse() => {
|
||||
debug!("received result in time {res:?}");
|
||||
let ret = nt(tag2, res?).await?;
|
||||
debug!("transformed result in time {ret:?}");
|
||||
Ok(ret)
|
||||
}
|
||||
let fut4 = taskrun::tokio::time::timeout(timeout, fut3);
|
||||
match fut4.await {
|
||||
Ok(x) => x,
|
||||
Err(_) => Err(Error::with_msg_no_trace(format!("timeout"))),
|
||||
}
|
||||
});
|
||||
Some((url, tag, jh))
|
||||
@@ -150,17 +145,17 @@ where
|
||||
})
|
||||
.collect();
|
||||
let mut a = Vec::new();
|
||||
for (_url, tag, jh) in spawned {
|
||||
for (url, tag, jh) in spawned {
|
||||
let res = match jh.await {
|
||||
Ok(k) => match k {
|
||||
Ok(k) => (Tag(tag), Ok(k)),
|
||||
Err(e) => {
|
||||
warn!("{e:?}");
|
||||
warn!("{} {}", url, e);
|
||||
(Tag(tag), Err(e))
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
warn!("{e:?}");
|
||||
warn!("{} {}", url, e);
|
||||
(Tag(tag), Err(e.into()))
|
||||
}
|
||||
};
|
||||
|
||||
@@ -50,11 +50,12 @@ pub async fn channel_search(req: Requ, ctx: &ReqCtx, proxy_config: &ProxyConfig)
|
||||
let mut tags = Vec::new();
|
||||
let mut bodies = Vec::new();
|
||||
for pb in &proxy_config.backends {
|
||||
if if let Some(b) = &query.backend {
|
||||
let use_backend = if let Some(b) = &query.backend {
|
||||
pb.name.contains(b)
|
||||
} else {
|
||||
true
|
||||
} {
|
||||
};
|
||||
if use_backend {
|
||||
match Url::parse(&format!("{}/api/4/search/channel", pb.url)) {
|
||||
Ok(mut url) => {
|
||||
query.append_to_url(&mut url);
|
||||
@@ -69,7 +70,8 @@ pub async fn channel_search(req: Requ, ctx: &ReqCtx, proxy_config: &ProxyConfig)
|
||||
let nt = |tag: String, res: Response<Incoming>| {
|
||||
let fut = async {
|
||||
let (_head, body) = res.into_parts();
|
||||
let body = read_body_bytes(body).await?;
|
||||
let fut = read_body_bytes(body);
|
||||
let body = fut.await?;
|
||||
//info!("got a result {:?}", body);
|
||||
let res: ChannelSearchResult = match serde_json::from_slice(&body) {
|
||||
Ok(k) => k,
|
||||
@@ -113,7 +115,7 @@ pub async fn channel_search(req: Requ, ctx: &ReqCtx, proxy_config: &ProxyConfig)
|
||||
tags,
|
||||
nt,
|
||||
ft,
|
||||
Duration::from_millis(3000),
|
||||
Duration::from_millis(1100),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Reference in New Issue
Block a user