Fix url creation, check endian of each event because of inconsistent config

This commit is contained in:
Dominik Werder
2021-06-23 08:57:14 +02:00
parent ebb1ce89dc
commit 411014d289
12 changed files with 192 additions and 131 deletions

View File

@@ -11,7 +11,8 @@ use futures_util::TryStreamExt;
use http::StatusCode;
use hyper::Body;
use netpod::log::*;
use netpod::{AggKind, ByteSize, Channel, HostPort, NanoRange, PerfOpts};
use netpod::{AggKind, AppendToUrl, ByteSize, Channel, HostPort, NanoRange, PerfOpts, APP_OCTET};
use url::Url;
pub async fn status(host: String, port: u16) -> Result<(), Error> {
let t1 = Utc::now();
@@ -62,11 +63,13 @@ pub async fn get_binned(
query.set_cache_usage(cache_usage);
query.set_disk_stats_every(ByteSize(1024 * disk_stats_every_kb));
let hp = HostPort { host: host, port: port };
let url = query.url(&hp);
let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?;
query.append_to_url(&mut url);
let url = url;
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(url)
.header("accept", "application/octet-stream")
.uri(url.to_string())
.header(http::header::ACCEPT, APP_OCTET)
.body(Body::empty())?;
let client = hyper::Client::new();
let res = client.request(req).await?;

View File

@@ -14,11 +14,12 @@ use futures_util::TryStreamExt;
use http::StatusCode;
use hyper::Body;
use netpod::log::*;
use netpod::{AggKind, Channel, Cluster, HostPort, NanoRange, PerfOpts};
use netpod::{AggKind, AppendToUrl, Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_OCTET};
use serde::de::DeserializeOwned;
use std::fmt::Debug;
use std::future::ready;
use tokio::io::AsyncRead;
use url::Url;
pub mod binnedjson;
pub mod events;
@@ -109,12 +110,14 @@ where
let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind);
query.set_cache_usage(CacheUsage::Ignore);
let hp = HostPort::from_node(node0);
let url = query.url(&hp);
let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?;
query.append_to_url(&mut url);
let url = url;
info!("get_binned_channel get {}", url);
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(url)
.header("Accept", "application/octet-stream")
.uri(url.to_string())
.header(http::header::ACCEPT, APP_OCTET)
.body(Body::empty())?;
let client = hyper::Client::new();
let res = client.request(req).await?;

View File

@@ -5,8 +5,9 @@ use err::Error;
use http::StatusCode;
use hyper::Body;
use netpod::log::*;
use netpod::{AggKind, Channel, Cluster, HostPort, NanoRange};
use netpod::{AggKind, AppendToUrl, Channel, Cluster, NanoRange, APP_JSON};
use std::time::Duration;
use url::Url;
#[test]
fn get_binned_json_0() {
@@ -94,12 +95,14 @@ async fn get_binned_json_common(
let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind);
query.set_timeout(Duration::from_millis(15000));
query.set_cache_usage(CacheUsage::Ignore);
let url = query.url(&HostPort::from_node(node0));
let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?;
query.append_to_url(&mut url);
let url = url;
info!("get_binned_json_common get {}", url);
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(url)
.header("Accept", "application/json")
.uri(url.to_string())
.header(http::header::ACCEPT, APP_JSON)
.body(Body::empty())?;
let client = hyper::Client::new();
let res = client.request(req).await?;

View File

@@ -13,7 +13,7 @@ use futures_util::{StreamExt, TryStreamExt};
use http::StatusCode;
use hyper::Body;
use netpod::log::*;
use netpod::{AppendToUrl, Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_OCTET};
use netpod::{AppendToUrl, Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_JSON, APP_OCTET};
use serde_json::Value as JsonValue;
use std::fmt::Debug;
use std::future::ready;
@@ -271,14 +271,16 @@ async fn get_plain_events_json(
name: channel_name.into(),
};
let range = NanoRange::from_date_time(beg_date, end_date);
let query = PlainEventsJsonQuery::new(channel, range);
let query = PlainEventsJsonQuery::new(channel, range, false);
let hp = HostPort::from_node(node0);
let url = query.url(&hp);
let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?;
query.append_to_url(&mut url);
let url = url;
info!("get_plain_events get {}", url);
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(url)
.header("Accept", "application/json")
.uri(url.to_string())
.header(http::header::ACCEPT, APP_JSON)
.body(Body::empty())?;
let client = hyper::Client::new();
let res = client.request(req).await?;

View File

@@ -774,7 +774,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec {
self.query.disk_stats_every().clone(),
self.query.report_error(),
)?;
let f = collect_plain_events_json(s, self.timeout, t_bin_count);
let f = collect_plain_events_json(s, self.timeout, t_bin_count, self.query.do_log());
let s = futures_util::stream::once(f).map(|item| match item {
Ok(item) => Ok(Bytes::from(serde_json::to_vec(&item)?)),
Err(e) => Err(e.into()),
@@ -794,7 +794,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec {
let x_bin_count = x_bin_count(&shape, self.query.agg_kind());
let s = MergedFromRemotes::<ENP>::new(evq, perf_opts, self.node_config.node_config.cluster.clone());
let s = TBinnerStream::<_, <ENP as EventsNodeProcessor>::Output>::new(s, range, x_bin_count);
let f = collect_plain_events_json(s, self.timeout, t_bin_count);
let f = collect_plain_events_json(s, self.timeout, t_bin_count, self.query.do_log());
let s = futures_util::stream::once(f).map(|item| match item {
Ok(item) => Ok(Bytes::from(serde_json::to_vec(&item)?)),
Err(e) => Err(e.into()),

View File

@@ -12,13 +12,16 @@ use futures_util::{FutureExt, StreamExt};
use http::{StatusCode, Uri};
use netpod::log::*;
use netpod::{
x_bin_count, AggKind, BinnedRange, ByteSize, Channel, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, Shape,
x_bin_count, AggKind, AppendToUrl, BinnedRange, ByteSize, Channel, NodeConfigCached, PerfOpts,
PreBinnedPatchIterator, Shape,
};
use serde::de::DeserializeOwned;
use std::future::ready;
use std::marker::PhantomData;
use std::pin::Pin;
use std::str::FromStr;
use std::task::{Context, Poll};
use url::Url;
pub struct FetchedPreBinned<TBT> {
uri: Uri,
@@ -33,15 +36,10 @@ impl<TBT> FetchedPreBinned<TBT> {
pub fn new(query: &PreBinnedQuery, node_config: &NodeConfigCached) -> Result<Self, Error> {
let nodeix = node_ix_for_patch(&query.patch(), &query.channel(), &node_config.node_config.cluster);
let node = &node_config.node_config.cluster.nodes[nodeix as usize];
let uri: hyper::Uri = format!(
"http://{}:{}/api/4/prebinned?{}",
node.host,
node.port,
query.make_query_string()
)
.parse()?;
let mut url = Url::parse(&format!("http://{}:{}/api/4/prebinned", node.host, node.port))?;
query.append_to_url(&mut url);
let ret = Self {
uri,
uri: Uri::from_str(&url.to_string())?,
resfut: None,
res: None,
errored: false,

View File

@@ -4,7 +4,7 @@ use http::request::Parts;
use netpod::log::*;
use netpod::{
channel_from_pairs, get_url_query_pairs, AggKind, AppendToUrl, ByteSize, Channel, FromUrl, HasBackend, HasTimeout,
HostPort, NanoRange, PreBinnedPatchCoord, ToNanos,
NanoRange, PreBinnedPatchCoord, ToNanos,
};
use std::collections::BTreeMap;
use std::time::Duration;
@@ -84,19 +84,6 @@ impl PreBinnedQuery {
Self::from_url(&url)
}
pub fn make_query_string(&self) -> String {
format!(
"{}&channelBackend={}&channelName={}&binningScheme={}&cacheUsage={}&diskStatsEveryKb={}&reportError={}",
self.patch.to_url_params_strings(),
self.channel.backend,
self.channel.name,
binning_scheme_query_string(&self.agg_kind),
self.cache_usage,
self.disk_stats_every.bytes() / 1024,
self.report_error(),
)
}
pub fn patch(&self) -> &PreBinnedPatchCoord {
&self.patch
}
@@ -122,6 +109,19 @@ impl PreBinnedQuery {
}
}
impl AppendToUrl for PreBinnedQuery {
fn append_to_url(&self, url: &mut Url) {
self.patch.append_to_url(url);
binning_scheme_append_to_url(&self.agg_kind, url);
let mut g = url.query_pairs_mut();
g.append_pair("channelBackend", &self.channel.backend);
g.append_pair("channelName", &self.channel.name);
g.append_pair("cacheUsage", &format!("{}", self.cache_usage.query_param_value()));
g.append_pair("diskStatsEveryKb", &format!("{}", self.disk_stats_every.bytes() / 1024));
g.append_pair("reportError", &format!("{}", self.report_error()));
}
}
#[derive(Clone, Debug)]
pub enum CacheUsage {
Use,
@@ -139,8 +139,8 @@ impl CacheUsage {
.into()
}
pub fn from_pairs(params: &BTreeMap<String, String>) -> Result<Self, Error> {
let ret = params.get("cacheUsage").map_or(Ok::<_, Error>(CacheUsage::Use), |k| {
pub fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Error> {
let ret = pairs.get("cacheUsage").map_or(Ok::<_, Error>(CacheUsage::Use), |k| {
if k == "use" {
Ok(CacheUsage::Use)
} else if k == "ignore" {
@@ -185,6 +185,7 @@ pub struct BinnedQuery {
report_error: bool,
timeout: Duration,
abort_after_bin_count: u32,
do_log: bool,
}
impl BinnedQuery {
@@ -199,6 +200,7 @@ impl BinnedQuery {
report_error: false,
timeout: Duration::from_millis(2000),
abort_after_bin_count: 0,
do_log: false,
}
}
@@ -238,6 +240,10 @@ impl BinnedQuery {
self.abort_after_bin_count
}
pub fn do_log(&self) -> bool {
self.do_log
}
pub fn set_cache_usage(&mut self, k: CacheUsage) {
self.cache_usage = k;
}
@@ -249,26 +255,6 @@ impl BinnedQuery {
pub fn set_timeout(&mut self, k: Duration) {
self.timeout = k;
}
// TODO remove in favor of AppendToUrl
pub fn url(&self, host: &HostPort) -> String {
let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ";
format!(
"http://{}:{}/api/4/binned?cacheUsage={}&channelBackend={}&channelName={}&binCount={}&begDate={}&endDate={}&binningScheme={}&diskStatsEveryKb={}&timeout={}&abortAfterBinCount={}",
host.host,
host.port,
self.cache_usage,
self.channel.backend,
self.channel.name,
self.bin_count,
Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt),
Utc.timestamp_nanos(self.range.end as i64).format(date_fmt),
binning_scheme_query_string(&self.agg_kind),
self.disk_stats_every.bytes() / 1024,
self.timeout.as_millis(),
self.abort_after_bin_count,
)
}
}
impl HasBackend for BinnedQuery {
@@ -322,6 +308,11 @@ impl FromUrl for BinnedQuery {
.map_or("0", |k| k)
.parse()
.map_err(|e| Error::with_msg(format!("can not parse abortAfterBinCount {:?}", e)))?,
do_log: pairs
.get("doLog")
.map_or("false", |k| k)
.parse()
.map_err(|e| Error::with_msg(format!("can not parse doLog {:?}", e)))?,
};
info!("BinnedQuery::from_url {:?}", ret);
Ok(ret)
@@ -331,31 +322,47 @@ impl FromUrl for BinnedQuery {
impl AppendToUrl for BinnedQuery {
fn append_to_url(&self, url: &mut Url) {
let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ";
let mut g = url.query_pairs_mut();
g.append_pair("cacheUsage", &self.cache_usage.to_string());
g.append_pair("channelBackend", &self.channel.backend);
g.append_pair("channelName", &self.channel.name);
g.append_pair("binCount", &format!("{}", self.bin_count));
g.append_pair(
"begDate",
&Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(),
);
g.append_pair(
"endDate",
&Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(),
);
g.append_pair("binningScheme", &binning_scheme_query_string(&self.agg_kind));
g.append_pair("diskStatsEveryKb", &format!("{}", self.disk_stats_every.bytes() / 1024));
g.append_pair("timeout", &format!("{}", self.timeout.as_millis()));
g.append_pair("abortAfterBinCount", &format!("{}", self.abort_after_bin_count));
{
let mut g = url.query_pairs_mut();
g.append_pair("cacheUsage", &self.cache_usage.to_string());
g.append_pair("channelBackend", &self.channel.backend);
g.append_pair("channelName", &self.channel.name);
g.append_pair("binCount", &format!("{}", self.bin_count));
g.append_pair(
"begDate",
&Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(),
);
g.append_pair(
"endDate",
&Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(),
);
}
{
binning_scheme_append_to_url(&self.agg_kind, url);
}
{
let mut g = url.query_pairs_mut();
g.append_pair("diskStatsEveryKb", &format!("{}", self.disk_stats_every.bytes() / 1024));
g.append_pair("timeout", &format!("{}", self.timeout.as_millis()));
g.append_pair("abortAfterBinCount", &format!("{}", self.abort_after_bin_count));
g.append_pair("doLog", &format!("{}", self.do_log));
}
}
}
fn binning_scheme_query_string(agg_kind: &AggKind) -> String {
fn binning_scheme_append_to_url(agg_kind: &AggKind, url: &mut Url) {
let mut g = url.query_pairs_mut();
match agg_kind {
AggKind::Plain => "fullValue".into(),
AggKind::DimXBins1 => "toScalarX".into(),
AggKind::DimXBinsN(n) => format!("binnedX&binnedXcount={}", n),
AggKind::Plain => {
g.append_pair("binningScheme", "fullValue");
}
AggKind::DimXBins1 => {
g.append_pair("binningScheme", "toScalarX");
}
AggKind::DimXBinsN(n) => {
g.append_pair("binningScheme", "toScalarX");
g.append_pair("binnedXcount", &format!("{}", n));
}
}
}

View File

@@ -15,6 +15,7 @@ use err::Error;
use futures_core::Stream;
use futures_util::future::FutureExt;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::{AggKind, ByteOrder, Channel, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape};
use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry};
use serde::de::DeserializeOwned;
@@ -269,16 +270,24 @@ pub struct PlainEventsJson {
agg_kind: AggKind,
timeout: Duration,
node_config: NodeConfigCached,
do_log: bool,
}
impl PlainEventsJson {
pub fn new(channel: Channel, range: NanoRange, timeout: Duration, node_config: NodeConfigCached) -> Self {
pub fn new(
channel: Channel,
range: NanoRange,
timeout: Duration,
node_config: NodeConfigCached,
do_log: bool,
) -> Self {
Self {
channel,
range,
agg_kind: AggKind::Plain,
timeout,
node_config,
do_log,
}
}
@@ -291,10 +300,12 @@ impl PlainEventsJson {
}
}
// TODO rename, it is also used for binned:
pub async fn collect_plain_events_json<T, S>(
stream: S,
timeout: Duration,
bin_count_exp: u32,
do_log: bool,
) -> Result<JsonValue, Error>
where
S: Stream<Item = Sitemty<T>> + Unpin,
@@ -327,8 +338,16 @@ where
Some(item) => {
match item {
Ok(item) => match item {
StreamItem::Log(_) => {}
StreamItem::Stats(_) => {}
StreamItem::Log(item) => {
if do_log {
info!("collect_plain_events_json log {:?}", item);
}
}
StreamItem::Stats(item) => {
if do_log {
info!("collect_plain_events_json stats {:?}", item);
}
}
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
collector.set_range_complete();
@@ -386,7 +405,7 @@ impl ChannelExecFunction for PlainEventsJson {
agg_kind: self.agg_kind,
};
let s = MergedFromRemotes::<ENP>::new(evq, perf_opts, self.node_config.node_config.cluster);
let f = collect_plain_events_json(s, self.timeout, 0);
let f = collect_plain_events_json(s, self.timeout, 0, self.do_log);
let f = FutureExt::map(f, |item| match item {
Ok(item) => {
// TODO add channel entry info here?

View File

@@ -19,21 +19,38 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::fs::File;
pub trait Endianness: Send + Unpin {}
pub trait Endianness: Send + Unpin {
fn is_big() -> bool;
}
pub struct LittleEndian {}
pub struct BigEndian {}
impl Endianness for LittleEndian {}
impl Endianness for BigEndian {}
impl Endianness for LittleEndian {
fn is_big() -> bool {
false
}
}
impl Endianness for BigEndian {
fn is_big() -> bool {
true
}
}
pub trait NumFromBytes<NTY, END> {
fn convert(buf: &[u8]) -> NTY;
fn convert(buf: &[u8], big_endian: bool) -> NTY;
}
macro_rules! impl_num_from_bytes_end {
($nty:ident, $nl:expr, $end:ident, $ec:ident) => {
impl NumFromBytes<$nty, $end> for $nty {
fn convert(buf: &[u8]) -> $nty {
$nty::$ec(*arrayref::array_ref![buf, 0, $nl])
fn convert(buf: &[u8], big_endian: bool) -> $nty {
// Error in data on disk:
// Can not rely on byte order as stated in the channel config.
//$nty::$ec(*arrayref::array_ref![buf, 0, $nl])
if big_endian {
$nty::from_be_bytes(*arrayref::array_ref![buf, 0, $nl])
} else {
$nty::from_le_bytes(*arrayref::array_ref![buf, 0, $nl])
}
}
}
};
@@ -62,7 +79,10 @@ where
NTY: NumFromBytes<NTY, END>,
{
type Output;
fn convert(&self, buf: &[u8]) -> Result<Self::Output, Error>;
// The written data on disk has errors:
// The endian as stated in the channel config does not match written events.
// Therefore, can not rely on that but have to check for each single event...
fn convert(&self, buf: &[u8], big_endian: bool) -> Result<Self::Output, Error>;
}
impl<NTY, END> EventValueFromBytes<NTY, END> for EventValuesDim0Case<NTY>
@@ -71,8 +91,8 @@ where
{
type Output = NTY;
fn convert(&self, buf: &[u8]) -> Result<Self::Output, Error> {
Ok(NTY::convert(buf))
fn convert(&self, buf: &[u8], big_endian: bool) -> Result<Self::Output, Error> {
Ok(NTY::convert(buf, big_endian))
}
}
@@ -82,7 +102,7 @@ where
{
type Output = Vec<NTY>;
fn convert(&self, buf: &[u8]) -> Result<Self::Output, Error> {
fn convert(&self, buf: &[u8], big_endian: bool) -> Result<Self::Output, Error> {
let es = size_of::<NTY>();
let n1 = buf.len() / es;
if n1 != self.n as usize {
@@ -92,7 +112,10 @@ where
// TODO could optimize using unsafe code..
for n2 in 0..n1 {
let i1 = es * n2;
vals.push(<NTY as NumFromBytes<NTY, END>>::convert(&buf[i1..(i1 + es)]));
vals.push(<NTY as NumFromBytes<NTY, END>>::convert(
&buf[i1..(i1 + es)],
big_endian,
));
}
Ok(vals)
}
@@ -419,11 +442,17 @@ where
// TODO check that dtype, event endianness and event shape match our static
// expectation about the data in this channel.
let _ty = &ev.scalar_types[i1];
let _be = ev.be[i1];
let be = ev.be[i1];
// Too bad, data on disk is inconsistent, can not rely on endian as stated in channel config.
if false && be != END::is_big() {
return Err(Error::with_msg(format!(
"endian mismatch in event got {} exp {}",
be,
END::is_big()
)));
}
let decomp = ev.decomps[i1].as_ref().unwrap().as_ref();
let val = self.evs.convert(decomp)?;
let val = self.evs.convert(decomp, be)?;
ret.tss.push(ev.tss[i1]);
ret.values.push(val);
}

View File

@@ -1,8 +1,7 @@
use chrono::{DateTime, TimeZone, Utc};
use err::Error;
use netpod::{
channel_from_pairs, get_url_query_pairs, AppendToUrl, Channel, FromUrl, HasBackend, HasTimeout, HostPort,
NanoRange, ToNanos,
channel_from_pairs, get_url_query_pairs, AppendToUrl, Channel, FromUrl, HasBackend, HasTimeout, NanoRange, ToNanos,
};
use std::time::Duration;
use url::Url;
@@ -97,15 +96,17 @@ pub struct PlainEventsJsonQuery {
range: NanoRange,
report_error: bool,
timeout: Duration,
do_log: bool,
}
impl PlainEventsJsonQuery {
pub fn new(channel: Channel, range: NanoRange) -> Self {
pub fn new(channel: Channel, range: NanoRange, do_log: bool) -> Self {
Self {
channel,
range,
report_error: false,
timeout: Duration::from_millis(10000),
do_log,
}
}
@@ -130,6 +131,11 @@ impl PlainEventsJsonQuery {
.parse::<u64>()
.map(|k| Duration::from_millis(k))
.map_err(|e| Error::with_msg(format!("can not parse timeout {:?}", e)))?,
do_log: pairs
.get("doLog")
.map_or("false", |k| k)
.parse()
.map_err(|e| Error::with_msg(format!("can not parse doLog {:?}", e)))?,
};
Ok(ret)
}
@@ -156,23 +162,12 @@ impl PlainEventsJsonQuery {
self.timeout
}
pub fn set_timeout(&mut self, k: Duration) {
self.timeout = k;
pub fn do_log(&self) -> bool {
self.do_log
}
// TODO remove in favor of Self::append_to_url
pub fn url(&self, host: &HostPort) -> String {
let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ";
format!(
"http://{}:{}/api/4/events?channelBackend={}&channelName={}&begDate={}&endDate={}&timeout={}",
host.host,
host.port,
self.channel.backend,
self.channel.name,
Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt),
Utc.timestamp_nanos(self.range.end as i64).format(date_fmt),
self.timeout.as_millis(),
)
pub fn set_timeout(&mut self, k: Duration) {
self.timeout = k;
}
pub fn append_to_url(&self, url: &mut Url) {
@@ -189,6 +184,7 @@ impl PlainEventsJsonQuery {
&Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(),
);
g.append_pair("timeout", &format!("{}", self.timeout.as_millis()));
g.append_pair("doLog", &format!("{}", self.do_log));
}
}

View File

@@ -434,6 +434,7 @@ async fn plain_events_json(req: Request<Body>, node_config: &NodeConfigCached) -
query.range().clone(),
query.timeout(),
node_config.clone(),
query.do_log(),
);
let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?;
let ret = response(StatusCode::OK).body(BodyStream::wrapped(s, format!("plain_events")))?;

View File

@@ -522,15 +522,6 @@ impl PreBinnedPatchCoord {
self.ix
}
pub fn to_url_params_strings(&self) -> String {
format!(
"patchTlen={}&binTlen={}&patchIx={}",
self.spec.patch_t_len(),
self.spec.bin_t_len(),
self.ix()
)
}
pub fn new(bin_t_len: u64, patch_t_len: u64, patch_ix: u64) -> Self {
Self {
spec: PreBinnedPatchGridSpec::new(bin_t_len, patch_t_len),
@@ -539,6 +530,15 @@ impl PreBinnedPatchCoord {
}
}
impl AppendToUrl for PreBinnedPatchCoord {
fn append_to_url(&self, url: &mut Url) {
let mut g = url.query_pairs_mut();
g.append_pair("patchTlen", &format!("{}", self.spec.patch_t_len()));
g.append_pair("binTlen", &format!("{}", self.spec.bin_t_len()));
g.append_pair("patchIx", &format!("{}", self.ix()));
}
}
pub struct PreBinnedPatchIterator {
range: PreBinnedPatchRange,
ix: u64,