From 411014d2890e963fbdac13be2dae22034465f606 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 23 Jun 2021 08:57:14 +0200 Subject: [PATCH] Fix url creation, check endian of each event because of inconsistent config --- daqbuffer/src/client.rs | 11 ++- daqbuffer/src/test.rs | 11 ++- daqbuffer/src/test/binnedjson.rs | 11 ++- daqbuffer/src/test/events.rs | 12 +-- disk/src/binned.rs | 4 +- disk/src/binned/binnedfrompbv.rs | 16 ++-- disk/src/binned/query.rs | 121 ++++++++++++++++--------------- disk/src/channelexec.rs | 27 ++++++- disk/src/decode.rs | 59 +++++++++++---- disk/src/events.rs | 32 ++++---- httpret/src/lib.rs | 1 + netpod/src/lib.rs | 18 ++--- 12 files changed, 192 insertions(+), 131 deletions(-) diff --git a/daqbuffer/src/client.rs b/daqbuffer/src/client.rs index 8a87cbc..c55adad 100644 --- a/daqbuffer/src/client.rs +++ b/daqbuffer/src/client.rs @@ -11,7 +11,8 @@ use futures_util::TryStreamExt; use http::StatusCode; use hyper::Body; use netpod::log::*; -use netpod::{AggKind, ByteSize, Channel, HostPort, NanoRange, PerfOpts}; +use netpod::{AggKind, AppendToUrl, ByteSize, Channel, HostPort, NanoRange, PerfOpts, APP_OCTET}; +use url::Url; pub async fn status(host: String, port: u16) -> Result<(), Error> { let t1 = Utc::now(); @@ -62,11 +63,13 @@ pub async fn get_binned( query.set_cache_usage(cache_usage); query.set_disk_stats_every(ByteSize(1024 * disk_stats_every_kb)); let hp = HostPort { host: host, port: port }; - let url = query.url(&hp); + let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?; + query.append_to_url(&mut url); + let url = url; let req = hyper::Request::builder() .method(http::Method::GET) - .uri(url) - .header("accept", "application/octet-stream") + .uri(url.to_string()) + .header(http::header::ACCEPT, APP_OCTET) .body(Body::empty())?; let client = hyper::Client::new(); let res = client.request(req).await?; diff --git a/daqbuffer/src/test.rs b/daqbuffer/src/test.rs index bd973ff..2572f14 100644 --- a/daqbuffer/src/test.rs +++ b/daqbuffer/src/test.rs @@ -14,11 +14,12 @@ use futures_util::TryStreamExt; use http::StatusCode; use hyper::Body; use netpod::log::*; -use netpod::{AggKind, Channel, Cluster, HostPort, NanoRange, PerfOpts}; +use netpod::{AggKind, AppendToUrl, Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_OCTET}; use serde::de::DeserializeOwned; use std::fmt::Debug; use std::future::ready; use tokio::io::AsyncRead; +use url::Url; pub mod binnedjson; pub mod events; @@ -109,12 +110,14 @@ where let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind); query.set_cache_usage(CacheUsage::Ignore); let hp = HostPort::from_node(node0); - let url = query.url(&hp); + let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?; + query.append_to_url(&mut url); + let url = url; info!("get_binned_channel get {}", url); let req = hyper::Request::builder() .method(http::Method::GET) - .uri(url) - .header("Accept", "application/octet-stream") + .uri(url.to_string()) + .header(http::header::ACCEPT, APP_OCTET) .body(Body::empty())?; let client = hyper::Client::new(); let res = client.request(req).await?; diff --git a/daqbuffer/src/test/binnedjson.rs b/daqbuffer/src/test/binnedjson.rs index b7269f5..36f705f 100644 --- a/daqbuffer/src/test/binnedjson.rs +++ b/daqbuffer/src/test/binnedjson.rs @@ -5,8 +5,9 @@ use err::Error; use http::StatusCode; use hyper::Body; use netpod::log::*; -use netpod::{AggKind, Channel, Cluster, HostPort, NanoRange}; +use netpod::{AggKind, AppendToUrl, Channel, Cluster, NanoRange, APP_JSON}; use std::time::Duration; +use url::Url; #[test] fn get_binned_json_0() { @@ -94,12 +95,14 @@ async fn get_binned_json_common( let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind); query.set_timeout(Duration::from_millis(15000)); query.set_cache_usage(CacheUsage::Ignore); - let url = query.url(&HostPort::from_node(node0)); + let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?; + query.append_to_url(&mut url); + let url = url; info!("get_binned_json_common get {}", url); let req = hyper::Request::builder() .method(http::Method::GET) - .uri(url) - .header("Accept", "application/json") + .uri(url.to_string()) + .header(http::header::ACCEPT, APP_JSON) .body(Body::empty())?; let client = hyper::Client::new(); let res = client.request(req).await?; diff --git a/daqbuffer/src/test/events.rs b/daqbuffer/src/test/events.rs index 1ceed64..c19565f 100644 --- a/daqbuffer/src/test/events.rs +++ b/daqbuffer/src/test/events.rs @@ -13,7 +13,7 @@ use futures_util::{StreamExt, TryStreamExt}; use http::StatusCode; use hyper::Body; use netpod::log::*; -use netpod::{AppendToUrl, Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_OCTET}; +use netpod::{AppendToUrl, Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_JSON, APP_OCTET}; use serde_json::Value as JsonValue; use std::fmt::Debug; use std::future::ready; @@ -271,14 +271,16 @@ async fn get_plain_events_json( name: channel_name.into(), }; let range = NanoRange::from_date_time(beg_date, end_date); - let query = PlainEventsJsonQuery::new(channel, range); + let query = PlainEventsJsonQuery::new(channel, range, false); let hp = HostPort::from_node(node0); - let url = query.url(&hp); + let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; + query.append_to_url(&mut url); + let url = url; info!("get_plain_events get {}", url); let req = hyper::Request::builder() .method(http::Method::GET) - .uri(url) - .header("Accept", "application/json") + .uri(url.to_string()) + .header(http::header::ACCEPT, APP_JSON) .body(Body::empty())?; let client = hyper::Client::new(); let res = client.request(req).await?; diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 3823aec..6686711 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -774,7 +774,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec { self.query.disk_stats_every().clone(), self.query.report_error(), )?; - let f = collect_plain_events_json(s, self.timeout, t_bin_count); + let f = collect_plain_events_json(s, self.timeout, t_bin_count, self.query.do_log()); let s = futures_util::stream::once(f).map(|item| match item { Ok(item) => Ok(Bytes::from(serde_json::to_vec(&item)?)), Err(e) => Err(e.into()), @@ -794,7 +794,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec { let x_bin_count = x_bin_count(&shape, self.query.agg_kind()); let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); let s = TBinnerStream::<_, ::Output>::new(s, range, x_bin_count); - let f = collect_plain_events_json(s, self.timeout, t_bin_count); + let f = collect_plain_events_json(s, self.timeout, t_bin_count, self.query.do_log()); let s = futures_util::stream::once(f).map(|item| match item { Ok(item) => Ok(Bytes::from(serde_json::to_vec(&item)?)), Err(e) => Err(e.into()), diff --git a/disk/src/binned/binnedfrompbv.rs b/disk/src/binned/binnedfrompbv.rs index 58db0a3..8e1aa27 100644 --- a/disk/src/binned/binnedfrompbv.rs +++ b/disk/src/binned/binnedfrompbv.rs @@ -12,13 +12,16 @@ use futures_util::{FutureExt, StreamExt}; use http::{StatusCode, Uri}; use netpod::log::*; use netpod::{ - x_bin_count, AggKind, BinnedRange, ByteSize, Channel, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, Shape, + x_bin_count, AggKind, AppendToUrl, BinnedRange, ByteSize, Channel, NodeConfigCached, PerfOpts, + PreBinnedPatchIterator, Shape, }; use serde::de::DeserializeOwned; use std::future::ready; use std::marker::PhantomData; use std::pin::Pin; +use std::str::FromStr; use std::task::{Context, Poll}; +use url::Url; pub struct FetchedPreBinned { uri: Uri, @@ -33,15 +36,10 @@ impl FetchedPreBinned { pub fn new(query: &PreBinnedQuery, node_config: &NodeConfigCached) -> Result { let nodeix = node_ix_for_patch(&query.patch(), &query.channel(), &node_config.node_config.cluster); let node = &node_config.node_config.cluster.nodes[nodeix as usize]; - let uri: hyper::Uri = format!( - "http://{}:{}/api/4/prebinned?{}", - node.host, - node.port, - query.make_query_string() - ) - .parse()?; + let mut url = Url::parse(&format!("http://{}:{}/api/4/prebinned", node.host, node.port))?; + query.append_to_url(&mut url); let ret = Self { - uri, + uri: Uri::from_str(&url.to_string())?, resfut: None, res: None, errored: false, diff --git a/disk/src/binned/query.rs b/disk/src/binned/query.rs index adcfa23..860263b 100644 --- a/disk/src/binned/query.rs +++ b/disk/src/binned/query.rs @@ -4,7 +4,7 @@ use http::request::Parts; use netpod::log::*; use netpod::{ channel_from_pairs, get_url_query_pairs, AggKind, AppendToUrl, ByteSize, Channel, FromUrl, HasBackend, HasTimeout, - HostPort, NanoRange, PreBinnedPatchCoord, ToNanos, + NanoRange, PreBinnedPatchCoord, ToNanos, }; use std::collections::BTreeMap; use std::time::Duration; @@ -84,19 +84,6 @@ impl PreBinnedQuery { Self::from_url(&url) } - pub fn make_query_string(&self) -> String { - format!( - "{}&channelBackend={}&channelName={}&binningScheme={}&cacheUsage={}&diskStatsEveryKb={}&reportError={}", - self.patch.to_url_params_strings(), - self.channel.backend, - self.channel.name, - binning_scheme_query_string(&self.agg_kind), - self.cache_usage, - self.disk_stats_every.bytes() / 1024, - self.report_error(), - ) - } - pub fn patch(&self) -> &PreBinnedPatchCoord { &self.patch } @@ -122,6 +109,19 @@ impl PreBinnedQuery { } } +impl AppendToUrl for PreBinnedQuery { + fn append_to_url(&self, url: &mut Url) { + self.patch.append_to_url(url); + binning_scheme_append_to_url(&self.agg_kind, url); + let mut g = url.query_pairs_mut(); + g.append_pair("channelBackend", &self.channel.backend); + g.append_pair("channelName", &self.channel.name); + g.append_pair("cacheUsage", &format!("{}", self.cache_usage.query_param_value())); + g.append_pair("diskStatsEveryKb", &format!("{}", self.disk_stats_every.bytes() / 1024)); + g.append_pair("reportError", &format!("{}", self.report_error())); + } +} + #[derive(Clone, Debug)] pub enum CacheUsage { Use, @@ -139,8 +139,8 @@ impl CacheUsage { .into() } - pub fn from_pairs(params: &BTreeMap) -> Result { - let ret = params.get("cacheUsage").map_or(Ok::<_, Error>(CacheUsage::Use), |k| { + pub fn from_pairs(pairs: &BTreeMap) -> Result { + let ret = pairs.get("cacheUsage").map_or(Ok::<_, Error>(CacheUsage::Use), |k| { if k == "use" { Ok(CacheUsage::Use) } else if k == "ignore" { @@ -185,6 +185,7 @@ pub struct BinnedQuery { report_error: bool, timeout: Duration, abort_after_bin_count: u32, + do_log: bool, } impl BinnedQuery { @@ -199,6 +200,7 @@ impl BinnedQuery { report_error: false, timeout: Duration::from_millis(2000), abort_after_bin_count: 0, + do_log: false, } } @@ -238,6 +240,10 @@ impl BinnedQuery { self.abort_after_bin_count } + pub fn do_log(&self) -> bool { + self.do_log + } + pub fn set_cache_usage(&mut self, k: CacheUsage) { self.cache_usage = k; } @@ -249,26 +255,6 @@ impl BinnedQuery { pub fn set_timeout(&mut self, k: Duration) { self.timeout = k; } - - // TODO remove in favor of AppendToUrl - pub fn url(&self, host: &HostPort) -> String { - let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; - format!( - "http://{}:{}/api/4/binned?cacheUsage={}&channelBackend={}&channelName={}&binCount={}&begDate={}&endDate={}&binningScheme={}&diskStatsEveryKb={}&timeout={}&abortAfterBinCount={}", - host.host, - host.port, - self.cache_usage, - self.channel.backend, - self.channel.name, - self.bin_count, - Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt), - Utc.timestamp_nanos(self.range.end as i64).format(date_fmt), - binning_scheme_query_string(&self.agg_kind), - self.disk_stats_every.bytes() / 1024, - self.timeout.as_millis(), - self.abort_after_bin_count, - ) - } } impl HasBackend for BinnedQuery { @@ -322,6 +308,11 @@ impl FromUrl for BinnedQuery { .map_or("0", |k| k) .parse() .map_err(|e| Error::with_msg(format!("can not parse abortAfterBinCount {:?}", e)))?, + do_log: pairs + .get("doLog") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse doLog {:?}", e)))?, }; info!("BinnedQuery::from_url {:?}", ret); Ok(ret) @@ -331,31 +322,47 @@ impl FromUrl for BinnedQuery { impl AppendToUrl for BinnedQuery { fn append_to_url(&self, url: &mut Url) { let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; - let mut g = url.query_pairs_mut(); - g.append_pair("cacheUsage", &self.cache_usage.to_string()); - g.append_pair("channelBackend", &self.channel.backend); - g.append_pair("channelName", &self.channel.name); - g.append_pair("binCount", &format!("{}", self.bin_count)); - g.append_pair( - "begDate", - &Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(), - ); - g.append_pair( - "endDate", - &Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(), - ); - g.append_pair("binningScheme", &binning_scheme_query_string(&self.agg_kind)); - g.append_pair("diskStatsEveryKb", &format!("{}", self.disk_stats_every.bytes() / 1024)); - g.append_pair("timeout", &format!("{}", self.timeout.as_millis())); - g.append_pair("abortAfterBinCount", &format!("{}", self.abort_after_bin_count)); + { + let mut g = url.query_pairs_mut(); + g.append_pair("cacheUsage", &self.cache_usage.to_string()); + g.append_pair("channelBackend", &self.channel.backend); + g.append_pair("channelName", &self.channel.name); + g.append_pair("binCount", &format!("{}", self.bin_count)); + g.append_pair( + "begDate", + &Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(), + ); + g.append_pair( + "endDate", + &Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(), + ); + } + { + binning_scheme_append_to_url(&self.agg_kind, url); + } + { + let mut g = url.query_pairs_mut(); + g.append_pair("diskStatsEveryKb", &format!("{}", self.disk_stats_every.bytes() / 1024)); + g.append_pair("timeout", &format!("{}", self.timeout.as_millis())); + g.append_pair("abortAfterBinCount", &format!("{}", self.abort_after_bin_count)); + g.append_pair("doLog", &format!("{}", self.do_log)); + } } } -fn binning_scheme_query_string(agg_kind: &AggKind) -> String { +fn binning_scheme_append_to_url(agg_kind: &AggKind, url: &mut Url) { + let mut g = url.query_pairs_mut(); match agg_kind { - AggKind::Plain => "fullValue".into(), - AggKind::DimXBins1 => "toScalarX".into(), - AggKind::DimXBinsN(n) => format!("binnedX&binnedXcount={}", n), + AggKind::Plain => { + g.append_pair("binningScheme", "fullValue"); + } + AggKind::DimXBins1 => { + g.append_pair("binningScheme", "toScalarX"); + } + AggKind::DimXBinsN(n) => { + g.append_pair("binningScheme", "toScalarX"); + g.append_pair("binnedXcount", &format!("{}", n)); + } } } diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index c07d9ad..9e21070 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -15,6 +15,7 @@ use err::Error; use futures_core::Stream; use futures_util::future::FutureExt; use futures_util::StreamExt; +use netpod::log::*; use netpod::{AggKind, ByteOrder, Channel, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use serde::de::DeserializeOwned; @@ -269,16 +270,24 @@ pub struct PlainEventsJson { agg_kind: AggKind, timeout: Duration, node_config: NodeConfigCached, + do_log: bool, } impl PlainEventsJson { - pub fn new(channel: Channel, range: NanoRange, timeout: Duration, node_config: NodeConfigCached) -> Self { + pub fn new( + channel: Channel, + range: NanoRange, + timeout: Duration, + node_config: NodeConfigCached, + do_log: bool, + ) -> Self { Self { channel, range, agg_kind: AggKind::Plain, timeout, node_config, + do_log, } } @@ -291,10 +300,12 @@ impl PlainEventsJson { } } +// TODO rename, it is also used for binned: pub async fn collect_plain_events_json( stream: S, timeout: Duration, bin_count_exp: u32, + do_log: bool, ) -> Result where S: Stream> + Unpin, @@ -327,8 +338,16 @@ where Some(item) => { match item { Ok(item) => match item { - StreamItem::Log(_) => {} - StreamItem::Stats(_) => {} + StreamItem::Log(item) => { + if do_log { + info!("collect_plain_events_json log {:?}", item); + } + } + StreamItem::Stats(item) => { + if do_log { + info!("collect_plain_events_json stats {:?}", item); + } + } StreamItem::DataItem(item) => match item { RangeCompletableItem::RangeComplete => { collector.set_range_complete(); @@ -386,7 +405,7 @@ impl ChannelExecFunction for PlainEventsJson { agg_kind: self.agg_kind, }; let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster); - let f = collect_plain_events_json(s, self.timeout, 0); + let f = collect_plain_events_json(s, self.timeout, 0, self.do_log); let f = FutureExt::map(f, |item| match item { Ok(item) => { // TODO add channel entry info here? diff --git a/disk/src/decode.rs b/disk/src/decode.rs index d5983fc..bf3487b 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -19,21 +19,38 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio::fs::File; -pub trait Endianness: Send + Unpin {} +pub trait Endianness: Send + Unpin { + fn is_big() -> bool; +} pub struct LittleEndian {} pub struct BigEndian {} -impl Endianness for LittleEndian {} -impl Endianness for BigEndian {} +impl Endianness for LittleEndian { + fn is_big() -> bool { + false + } +} +impl Endianness for BigEndian { + fn is_big() -> bool { + true + } +} pub trait NumFromBytes { - fn convert(buf: &[u8]) -> NTY; + fn convert(buf: &[u8], big_endian: bool) -> NTY; } macro_rules! impl_num_from_bytes_end { ($nty:ident, $nl:expr, $end:ident, $ec:ident) => { impl NumFromBytes<$nty, $end> for $nty { - fn convert(buf: &[u8]) -> $nty { - $nty::$ec(*arrayref::array_ref![buf, 0, $nl]) + fn convert(buf: &[u8], big_endian: bool) -> $nty { + // Error in data on disk: + // Can not rely on byte order as stated in the channel config. + //$nty::$ec(*arrayref::array_ref![buf, 0, $nl]) + if big_endian { + $nty::from_be_bytes(*arrayref::array_ref![buf, 0, $nl]) + } else { + $nty::from_le_bytes(*arrayref::array_ref![buf, 0, $nl]) + } } } }; @@ -62,7 +79,10 @@ where NTY: NumFromBytes, { type Output; - fn convert(&self, buf: &[u8]) -> Result; + // The written data on disk has errors: + // The endian as stated in the channel config does not match written events. + // Therefore, can not rely on that but have to check for each single event... + fn convert(&self, buf: &[u8], big_endian: bool) -> Result; } impl EventValueFromBytes for EventValuesDim0Case @@ -71,8 +91,8 @@ where { type Output = NTY; - fn convert(&self, buf: &[u8]) -> Result { - Ok(NTY::convert(buf)) + fn convert(&self, buf: &[u8], big_endian: bool) -> Result { + Ok(NTY::convert(buf, big_endian)) } } @@ -82,7 +102,7 @@ where { type Output = Vec; - fn convert(&self, buf: &[u8]) -> Result { + fn convert(&self, buf: &[u8], big_endian: bool) -> Result { let es = size_of::(); let n1 = buf.len() / es; if n1 != self.n as usize { @@ -92,7 +112,10 @@ where // TODO could optimize using unsafe code.. for n2 in 0..n1 { let i1 = es * n2; - vals.push(>::convert(&buf[i1..(i1 + es)])); + vals.push(>::convert( + &buf[i1..(i1 + es)], + big_endian, + )); } Ok(vals) } @@ -419,11 +442,17 @@ where // TODO check that dtype, event endianness and event shape match our static // expectation about the data in this channel. let _ty = &ev.scalar_types[i1]; - let _be = ev.be[i1]; - + let be = ev.be[i1]; + // Too bad, data on disk is inconsistent, can not rely on endian as stated in channel config. + if false && be != END::is_big() { + return Err(Error::with_msg(format!( + "endian mismatch in event got {} exp {}", + be, + END::is_big() + ))); + } let decomp = ev.decomps[i1].as_ref().unwrap().as_ref(); - - let val = self.evs.convert(decomp)?; + let val = self.evs.convert(decomp, be)?; ret.tss.push(ev.tss[i1]); ret.values.push(val); } diff --git a/disk/src/events.rs b/disk/src/events.rs index f514869..4b6ef6e 100644 --- a/disk/src/events.rs +++ b/disk/src/events.rs @@ -1,8 +1,7 @@ use chrono::{DateTime, TimeZone, Utc}; use err::Error; use netpod::{ - channel_from_pairs, get_url_query_pairs, AppendToUrl, Channel, FromUrl, HasBackend, HasTimeout, HostPort, - NanoRange, ToNanos, + channel_from_pairs, get_url_query_pairs, AppendToUrl, Channel, FromUrl, HasBackend, HasTimeout, NanoRange, ToNanos, }; use std::time::Duration; use url::Url; @@ -97,15 +96,17 @@ pub struct PlainEventsJsonQuery { range: NanoRange, report_error: bool, timeout: Duration, + do_log: bool, } impl PlainEventsJsonQuery { - pub fn new(channel: Channel, range: NanoRange) -> Self { + pub fn new(channel: Channel, range: NanoRange, do_log: bool) -> Self { Self { channel, range, report_error: false, timeout: Duration::from_millis(10000), + do_log, } } @@ -130,6 +131,11 @@ impl PlainEventsJsonQuery { .parse::() .map(|k| Duration::from_millis(k)) .map_err(|e| Error::with_msg(format!("can not parse timeout {:?}", e)))?, + do_log: pairs + .get("doLog") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse doLog {:?}", e)))?, }; Ok(ret) } @@ -156,23 +162,12 @@ impl PlainEventsJsonQuery { self.timeout } - pub fn set_timeout(&mut self, k: Duration) { - self.timeout = k; + pub fn do_log(&self) -> bool { + self.do_log } - // TODO remove in favor of Self::append_to_url - pub fn url(&self, host: &HostPort) -> String { - let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; - format!( - "http://{}:{}/api/4/events?channelBackend={}&channelName={}&begDate={}&endDate={}&timeout={}", - host.host, - host.port, - self.channel.backend, - self.channel.name, - Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt), - Utc.timestamp_nanos(self.range.end as i64).format(date_fmt), - self.timeout.as_millis(), - ) + pub fn set_timeout(&mut self, k: Duration) { + self.timeout = k; } pub fn append_to_url(&self, url: &mut Url) { @@ -189,6 +184,7 @@ impl PlainEventsJsonQuery { &Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(), ); g.append_pair("timeout", &format!("{}", self.timeout.as_millis())); + g.append_pair("doLog", &format!("{}", self.do_log)); } } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index e3833f5..0b874d5 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -434,6 +434,7 @@ async fn plain_events_json(req: Request, node_config: &NodeConfigCached) - query.range().clone(), query.timeout(), node_config.clone(), + query.do_log(), ); let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?; let ret = response(StatusCode::OK).body(BodyStream::wrapped(s, format!("plain_events")))?; diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 89fb221..a7b8915 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -522,15 +522,6 @@ impl PreBinnedPatchCoord { self.ix } - pub fn to_url_params_strings(&self) -> String { - format!( - "patchTlen={}&binTlen={}&patchIx={}", - self.spec.patch_t_len(), - self.spec.bin_t_len(), - self.ix() - ) - } - pub fn new(bin_t_len: u64, patch_t_len: u64, patch_ix: u64) -> Self { Self { spec: PreBinnedPatchGridSpec::new(bin_t_len, patch_t_len), @@ -539,6 +530,15 @@ impl PreBinnedPatchCoord { } } +impl AppendToUrl for PreBinnedPatchCoord { + fn append_to_url(&self, url: &mut Url) { + let mut g = url.query_pairs_mut(); + g.append_pair("patchTlen", &format!("{}", self.spec.patch_t_len())); + g.append_pair("binTlen", &format!("{}", self.spec.bin_t_len())); + g.append_pair("patchIx", &format!("{}", self.ix())); + } +} + pub struct PreBinnedPatchIterator { range: PreBinnedPatchRange, ix: u64,