diff --git a/daqbufp2/src/test/binnedjson.rs b/daqbufp2/src/test/binnedjson.rs index c205bd3..867b226 100644 --- a/daqbufp2/src/test/binnedjson.rs +++ b/daqbufp2/src/test/binnedjson.rs @@ -418,7 +418,7 @@ async fn get_events_json_common_res( let beg_date: DateTime = beg_date.parse()?; let end_date: DateTime = end_date.parse()?; let range = NanoRange::from_date_time(beg_date, end_date); - let mut query = PlainEventsJsonQuery::new(channel, range, 4096, false); + let mut query = PlainEventsJsonQuery::new(channel, range, 4096, None, false); query.set_timeout(Duration::from_millis(15000)); let mut url = Url::parse(&format!("http://{}:{}/api/4/events", node0.host, node0.port))?; query.append_to_url(&mut url); diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs index 82e96eb..999a305 100644 --- a/daqbufp2/src/test/events.rs +++ b/daqbufp2/src/test/events.rs @@ -274,7 +274,7 @@ pub async fn get_plain_events_json( name: channel_name.into(), }; let range = NanoRange::from_date_time(beg_date, end_date); - let query = PlainEventsJsonQuery::new(channel, range, 1024 * 4, false); + let query = PlainEventsJsonQuery::new(channel, range, 1024 * 4, None, false); let hp = HostPort::from_node(node0); let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; query.append_to_url(&mut url); diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 4bf2fe2..e4e8c47 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -351,7 +351,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec { self.query.disk_stats_every().clone(), self.query.report_error(), )?; - let f = collect_plain_events_json(s, self.timeout, t_bin_count, self.query.do_log()); + let f = collect_plain_events_json(s, self.timeout, t_bin_count, u64::MAX, self.query.do_log()); let s = futures_util::stream::once(f).map(|item| match item { Ok(item) => Ok(Bytes::from(serde_json::to_vec(&item)?)), Err(e) => Err(e.into()), @@ -375,7 +375,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec { x_bin_count, self.query.agg_kind().do_time_weighted(), ); - let f = collect_plain_events_json(s, self.timeout, t_bin_count, self.query.do_log()); + let f = collect_plain_events_json(s, self.timeout, t_bin_count, u64::MAX, self.query.do_log()); let s = futures_util::stream::once(f).map(|item| match item { Ok(item) => Ok(Bytes::from(serde_json::to_vec(&item)?)), Err(e) => Err(e.into()), diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index 3732254..179780d 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -208,7 +208,9 @@ where range: range.clone(), expand: agg_kind.need_expand(), }; - let conf = httpclient::get_channel_config(&q, node_config).await?; + let conf = httpclient::get_channel_config(&q, node_config) + .await + .map_err(|e| e.add_public_msg(format!("Can not find channel config for {}", q.channel.name())))?; let ret = channel_exec_config( f, conf.scalar_type.clone(), @@ -293,6 +295,7 @@ pub struct PlainEventsJson { disk_io_buffer_size: usize, timeout: Duration, node_config: NodeConfigCached, + events_max: u64, do_log: bool, } @@ -303,6 +306,7 @@ impl PlainEventsJson { disk_io_buffer_size: usize, timeout: Duration, node_config: NodeConfigCached, + events_max: u64, do_log: bool, ) -> Self { Self { @@ -312,6 +316,7 @@ impl PlainEventsJson { disk_io_buffer_size, timeout, node_config, + events_max, do_log, } } @@ -330,6 +335,7 @@ pub async fn collect_plain_events_json( stream: S, timeout: Duration, bin_count_exp: u32, + events_max: u64, do_log: bool, ) -> Result where @@ -394,6 +400,9 @@ where RangeCompletableItem::Data(item) => { collector.ingest(&item); i1 += 1; + if i1 >= events_max { + break; + } } }, }, @@ -448,7 +457,7 @@ impl ChannelExecFunction for PlainEventsJson { do_decompress: true, }; let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster); - let f = collect_plain_events_json(s, self.timeout, 0, self.do_log); + let f = collect_plain_events_json(s, self.timeout, 0, self.events_max, self.do_log); let f = FutureExt::map(f, |item| match item { Ok(item) => { // TODO add channel entry info here? @@ -466,14 +475,3 @@ impl ChannelExecFunction for PlainEventsJson { Box::pin(futures_util::stream::empty()) } } - -// TODO remove when done. -pub fn dummy_impl() { - let channel: Channel = err::todoval(); - let range: NanoRange = err::todoval(); - let agg_kind: AggKind = err::todoval(); - let node_config: NodeConfigCached = err::todoval(); - let timeout: Duration = err::todoval(); - let f = PlainEventsJson::new(channel.clone(), range.clone(), 0, timeout, node_config.clone(), false); - let _ = channel_exec(f, &channel, &range, agg_kind, &node_config); -} diff --git a/disk/src/events.rs b/disk/src/events.rs index dc52489..bd6e5d7 100644 --- a/disk/src/events.rs +++ b/disk/src/events.rs @@ -109,17 +109,25 @@ pub struct PlainEventsJsonQuery { disk_io_buffer_size: usize, report_error: bool, timeout: Duration, + events_max: Option, do_log: bool, } impl PlainEventsJsonQuery { - pub fn new(channel: Channel, range: NanoRange, disk_io_buffer_size: usize, do_log: bool) -> Self { + pub fn new( + channel: Channel, + range: NanoRange, + disk_io_buffer_size: usize, + events_max: Option, + do_log: bool, + ) -> Self { Self { channel, range, disk_io_buffer_size, report_error: false, timeout: Duration::from_millis(10000), + events_max, do_log, } } @@ -150,6 +158,9 @@ impl PlainEventsJsonQuery { .parse::() .map(|k| Duration::from_millis(k)) .map_err(|e| Error::with_public_msg(format!("can not parse timeout {:?}", e)))?, + events_max: pairs + .get("eventsMax") + .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, do_log: pairs .get("doLog") .map_or("false", |k| k) @@ -185,6 +196,10 @@ impl PlainEventsJsonQuery { self.timeout } + pub fn events_max(&self) -> Option { + self.events_max + } + pub fn do_log(&self) -> bool { self.do_log } @@ -208,6 +223,9 @@ impl PlainEventsJsonQuery { ); g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size)); g.append_pair("timeout", &format!("{}", self.timeout.as_millis())); + if let Some(x) = self.events_max.as_ref() { + g.append_pair("eventsMax", &format!("{}", x)); + } g.append_pair("doLog", &format!("{}", self.do_log)); } } diff --git a/httpret/src/events.rs b/httpret/src/events.rs index 1b37a3d..cb6917e 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -24,23 +24,15 @@ impl EventsHandler { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } - let ret = match plain_events(req, node_config).await { - Ok(res) => res, - Err(e) => response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))?, - }; - Ok(ret) + match plain_events(req, node_config).await { + Ok(ret) => Ok(ret), + Err(e) => Ok(e.to_public_response()), + } } } async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - match plain_events_inner(req, node_config).await { - Ok(ret) => Ok(ret), - Err(e) => Ok(e.to_public_response()), - } -} - -async fn plain_events_inner(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - info!("httpret plain_events_inner req: {:?}", req); + info!("httpret plain_events req: {:?}", req); let accept_def = APP_JSON; let accept = req .headers() @@ -85,6 +77,7 @@ async fn plain_events_json(req: Request, node_config: &NodeConfigCached) - query.disk_io_buffer_size(), query.timeout(), node_config.clone(), + query.events_max().unwrap_or(u64::MAX), query.do_log(), ); let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?; diff --git a/httpret/src/evinfo.rs b/httpret/src/evinfo.rs index 524c40a..f3485c6 100644 --- a/httpret/src/evinfo.rs +++ b/httpret/src/evinfo.rs @@ -74,7 +74,12 @@ impl EventInfoScan { node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { let ret = channel_exec( - EvInfoFunc::new(query.clone(), query.timeout(), node_config.clone()), + EvInfoFunc::new( + query.clone(), + query.timeout(), + query.events_max().unwrap_or(u64::MAX), + node_config.clone(), + ), query.channel(), query.range(), AggKind::Stats1, @@ -89,13 +94,15 @@ pub struct EvInfoFunc { query: PlainEventsJsonQuery, timeout: Duration, node_config: NodeConfigCached, + events_max: u64, } impl EvInfoFunc { - pub fn new(query: PlainEventsJsonQuery, timeout: Duration, node_config: NodeConfigCached) -> Self { + pub fn new(query: PlainEventsJsonQuery, timeout: Duration, events_max: u64, node_config: NodeConfigCached) -> Self { Self { query, timeout, + events_max, node_config, } } @@ -151,7 +158,7 @@ impl ChannelExecFunction for EvInfoFunc { // TODO Must issue multiple reads to GPFS, keep futures in a ordered queue. let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster); - let f = collect_plain_events_json(s, self.timeout, 0, self.query.do_log()); + let f = collect_plain_events_json(s, self.timeout, 0, self.events_max, self.query.do_log()); let f = FutureExt::map(f, |item| match item { Ok(item) => { // TODO add channel entry info here? diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index f5fdd27..fd5138b 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -225,7 +225,7 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } - } else if path == "/api/4/random_channel" { + } else if path == "/api/4/random/channel" { if req.method() == Method::GET { Ok(random_channel(req, &node_config).await?) } else { @@ -287,7 +287,10 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } } else if path == "/api/4/channel/config" { if req.method() == Method::GET { - Ok(channel_config(req, &node_config).await?) + match channel_config(req, &node_config).await { + Ok(k) => Ok(k), + Err(e) => Ok(e.to_public_response()), + } } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } @@ -451,6 +454,7 @@ trait ToPublicResponse { impl ToPublicResponse for Error { fn to_public_response(&self) -> Response { + error!("ToPublicResponse converts: {self:?}"); use std::fmt::Write; let status = match self.reason() { Some(::err::Reason::BadRequest) => StatusCode::BAD_REQUEST, diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index d356ad3..273e099 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -4,7 +4,7 @@ use crate::api1::{channel_search_configs_v1, channel_search_list_v1, gather_json use crate::err::Error; use crate::gather::{gather_get_json_generic, SubRes}; use crate::pulsemap::MapPulseQuery; -use crate::{api_1_docs, api_4_docs, response, Cont}; +use crate::{api_1_docs, api_4_docs, response, response_err, Cont}; use disk::events::PlainEventsJsonQuery; use futures_core::Stream; use futures_util::pin_mut; @@ -471,7 +471,14 @@ where Some(v) => { if v == APP_JSON || v == ACCEPT_ALL { let url = Url::parse(&format!("dummy:{}", head.uri))?; - let query = QT::from_url(&url)?; + let query = match QT::from_url(&url) { + Ok(k) => k, + Err(_) => { + let msg = format!("Malformed request or missing parameters"); + return Ok(response_err(StatusCode::BAD_REQUEST, msg)?); + } + }; + // TODO is this special case used any more? let sh = if url.as_str().contains("/map/pulse/") { get_query_host_for_backend_2(&query.backend(), proxy_config)? } else {