Adapt format interpolation
This commit is contained in:
@@ -82,11 +82,10 @@ impl ChannelExecFunction for BinnedBinaryChannelExec {
|
|||||||
let perf_opts = PerfOpts { inmem_bufcap: 512 };
|
let perf_opts = PerfOpts { inmem_bufcap: 512 };
|
||||||
let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) {
|
let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) {
|
||||||
Ok(Some(pre_range)) => {
|
Ok(Some(pre_range)) => {
|
||||||
debug!("BinnedBinaryChannelExec found pre_range: {:?}", pre_range);
|
debug!("BinnedBinaryChannelExec found pre_range: {pre_range:?}");
|
||||||
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
|
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
|
||||||
let msg = format!(
|
let msg = format!(
|
||||||
"BinnedBinaryChannelExec incompatible ranges:\npre_range: {:?}\nrange: {:?}",
|
"BinnedBinaryChannelExec incompatible ranges:\npre_range: {pre_range:?}\nrange: {range:?}"
|
||||||
pre_range, range
|
|
||||||
);
|
);
|
||||||
return Err(Error::with_msg(msg));
|
return Err(Error::with_msg(msg));
|
||||||
}
|
}
|
||||||
@@ -110,8 +109,7 @@ impl ChannelExecFunction for BinnedBinaryChannelExec {
|
|||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
debug!(
|
debug!(
|
||||||
"BinnedBinaryChannelExec no covering range for prebinned, merge from remotes instead {:?}",
|
"BinnedBinaryChannelExec no covering range for prebinned, merge from remotes instead {range:?}"
|
||||||
range
|
|
||||||
);
|
);
|
||||||
let evq = RawEventsQuery {
|
let evq = RawEventsQuery {
|
||||||
channel: self.query.channel().clone(),
|
channel: self.query.channel().clone(),
|
||||||
@@ -226,7 +224,7 @@ where
|
|||||||
S: Stream<Item = Sitemty<T>> + Unpin,
|
S: Stream<Item = Sitemty<T>> + Unpin,
|
||||||
T: Collectable,
|
T: Collectable,
|
||||||
{
|
{
|
||||||
info!("\n\nConstruct deadline with timeout {:?}\n\n", timeout);
|
info!("\n\nConstruct deadline with timeout {timeout:?}\n\n");
|
||||||
let deadline = tokio::time::Instant::now() + timeout;
|
let deadline = tokio::time::Instant::now() + timeout;
|
||||||
let mut collector = <T as Collectable>::new_collector(bin_count_exp);
|
let mut collector = <T as Collectable>::new_collector(bin_count_exp);
|
||||||
let mut i1 = 0;
|
let mut i1 = 0;
|
||||||
@@ -327,11 +325,10 @@ impl ChannelExecFunction for BinnedJsonChannelExec {
|
|||||||
let perf_opts = PerfOpts { inmem_bufcap: 512 };
|
let perf_opts = PerfOpts { inmem_bufcap: 512 };
|
||||||
let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) {
|
let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) {
|
||||||
Ok(Some(pre_range)) => {
|
Ok(Some(pre_range)) => {
|
||||||
debug!("BinnedJsonChannelExec found pre_range: {:?}", pre_range);
|
debug!("BinnedJsonChannelExec found pre_range: {pre_range:?}");
|
||||||
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
|
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
|
||||||
let msg = format!(
|
let msg = format!(
|
||||||
"BinnedJsonChannelExec incompatible ranges:\npre_range: {:?}\nrange: {:?}",
|
"BinnedJsonChannelExec incompatible ranges:\npre_range: {pre_range:?}\nrange: {range:?}"
|
||||||
pre_range, range
|
|
||||||
);
|
);
|
||||||
return Err(Error::with_msg(msg));
|
return Err(Error::with_msg(msg));
|
||||||
}
|
}
|
||||||
@@ -355,10 +352,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec {
|
|||||||
Ok(Box::pin(s) as Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>)
|
Ok(Box::pin(s) as Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>)
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
debug!(
|
debug!("BinnedJsonChannelExec no covering range for prebinned, merge from remotes instead {range:?}");
|
||||||
"BinnedJsonChannelExec no covering range for prebinned, merge from remotes instead {:?}",
|
|
||||||
range
|
|
||||||
);
|
|
||||||
let evq = RawEventsQuery {
|
let evq = RawEventsQuery {
|
||||||
channel: self.query.channel().clone(),
|
channel: self.query.channel().clone(),
|
||||||
range: self.query.range().clone(),
|
range: self.query.range().clone(),
|
||||||
|
|||||||
@@ -35,7 +35,11 @@ impl<TBT> FetchedPreBinned<TBT> {
|
|||||||
pub fn new(query: &PreBinnedQuery, node_config: &NodeConfigCached) -> Result<Self, Error> {
|
pub fn new(query: &PreBinnedQuery, node_config: &NodeConfigCached) -> Result<Self, Error> {
|
||||||
let nodeix = node_ix_for_patch(&query.patch(), &query.channel(), &node_config.node_config.cluster);
|
let nodeix = node_ix_for_patch(&query.patch(), &query.channel(), &node_config.node_config.cluster);
|
||||||
let node = &node_config.node_config.cluster.nodes[nodeix as usize];
|
let node = &node_config.node_config.cluster.nodes[nodeix as usize];
|
||||||
let mut url = Url::parse(&format!("http://{}:{}/api/4/prebinned", node.host, node.port))?;
|
let mut url = {
|
||||||
|
let host = &node.host;
|
||||||
|
let port = node.port;
|
||||||
|
Url::parse(&format!("http://{host}:{port}/api/4/prebinned"))?
|
||||||
|
};
|
||||||
query.append_to_url(&mut url);
|
query.append_to_url(&mut url);
|
||||||
let ret = Self {
|
let ret = Self {
|
||||||
uri: Uri::from_str(&url.to_string()).map_err(Error::from_string)?,
|
uri: Uri::from_str(&url.to_string()).map_err(Error::from_string)?,
|
||||||
@@ -102,18 +106,16 @@ where
|
|||||||
self.res = Some(s2);
|
self.res = Some(s2);
|
||||||
continue 'outer;
|
continue 'outer;
|
||||||
} else {
|
} else {
|
||||||
let msg = format!(
|
let msg =
|
||||||
"PreBinnedValueFetchedStream got non-OK result from sub request: {:?}",
|
format!("PreBinnedValueFetchedStream got non-OK result from sub request: {res:?}");
|
||||||
res
|
error!("{msg}");
|
||||||
);
|
|
||||||
error!("{}", msg);
|
|
||||||
let e = Error::with_msg_no_trace(msg);
|
let e = Error::with_msg_no_trace(msg);
|
||||||
self.errored = true;
|
self.errored = true;
|
||||||
Ready(Some(Err(e)))
|
Ready(Some(Err(e)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("PreBinnedValueStream error in stream {:?}", e);
|
error!("PreBinnedValueStream error in stream {e:?}");
|
||||||
self.errored = true;
|
self.errored = true;
|
||||||
Ready(Some(Err(Error::from_string(e))))
|
Ready(Some(Err(Error::from_string(e))))
|
||||||
}
|
}
|
||||||
@@ -176,9 +178,9 @@ where
|
|||||||
// Convert this to a StreamLog message:
|
// Convert this to a StreamLog message:
|
||||||
for (i, p) in patches.iter().enumerate() {
|
for (i, p) in patches.iter().enumerate() {
|
||||||
use std::fmt::Write;
|
use std::fmt::Write;
|
||||||
write!(sp, " • patch {:2} {:?}\n", i, p)?;
|
write!(sp, " • patch {i:2} {p:?}\n")?;
|
||||||
}
|
}
|
||||||
info!("Using these pre-binned patches:\n{}", sp);
|
info!("Using these pre-binned patches:\n{sp}");
|
||||||
}
|
}
|
||||||
let pmax = patches.len();
|
let pmax = patches.len();
|
||||||
let inp = futures_util::stream::iter(patches.into_iter().enumerate())
|
let inp = futures_util::stream::iter(patches.into_iter().enumerate())
|
||||||
@@ -199,7 +201,7 @@ where
|
|||||||
match FetchedPreBinned::<TBT>::new(&query, &node_config) {
|
match FetchedPreBinned::<TBT>::new(&query, &node_config) {
|
||||||
Ok(stream) => Box::pin(stream.map(move |q| (pix, q))),
|
Ok(stream) => Box::pin(stream.map(move |q| (pix, q))),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("error from PreBinnedValueFetchedStream::new {:?}", e);
|
error!("error from PreBinnedValueFetchedStream::new {e:?}");
|
||||||
Box::pin(futures_util::stream::iter(vec![(pix, Err(e))]))
|
Box::pin(futures_util::stream::iter(vec![(pix, Err(e))]))
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user