Constrain channel search, add better error message
This commit is contained in:
@@ -30,13 +30,7 @@ async fn position_file(
|
||||
expand_left: bool,
|
||||
expand_right: bool,
|
||||
) -> Result<Positioned, Error> {
|
||||
trace!(
|
||||
"position_file called {} {} {:?} {:?}",
|
||||
expand_left,
|
||||
expand_right,
|
||||
range,
|
||||
path
|
||||
);
|
||||
trace!("position_file called expand_left {expand_left} expand_right {expand_right} {range:?} {path:?}");
|
||||
assert_eq!(expand_left && expand_right, false);
|
||||
match OpenOptions::new().read(true).open(&path).await {
|
||||
Ok(file) => {
|
||||
@@ -73,9 +67,16 @@ async fn position_file(
|
||||
buf.resize(buf.capacity(), 0);
|
||||
index_file.read_exact(&mut buf).await?;
|
||||
let gg = if expand_left {
|
||||
super::index::find_largest_smaller_than(range.clone(), expand_right, &buf[2..])?
|
||||
super::index::find_largest_smaller_than(range.clone(), expand_right, &buf[2..])
|
||||
} else {
|
||||
super::index::find_ge(range.clone(), expand_right, &buf[2..])?
|
||||
super::index::find_ge(range.clone(), expand_right, &buf[2..])
|
||||
};
|
||||
let gg = match gg {
|
||||
Ok(x) => x,
|
||||
Err(e) => {
|
||||
error!("can not position file for range {range:?} expand_right {expand_right:?} buflen {buflen}", buflen = buf.len());
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
match gg {
|
||||
Some(o) => {
|
||||
@@ -207,12 +208,17 @@ pub fn open_files(
|
||||
tokio::spawn(async move {
|
||||
match open_files_inner(&chtx, &range, &channel_config, node).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => match chtx.send(Err(e.into())).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("open_files channel send error {:?}", e);
|
||||
Err(e) => {
|
||||
let e = e.add_public_msg(format!(
|
||||
"Can not open file for channel: {channel_config:?} range: {range:?}"
|
||||
));
|
||||
match chtx.send(Err(e.into())).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("open_files channel send error {:?}", e);
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
});
|
||||
chrx
|
||||
|
||||
@@ -28,9 +28,6 @@ pub fn find_ge(range: NanoRange, expand_right: bool, buf: &[u8]) -> Result<Optio
|
||||
let mut k = n1 - 1;
|
||||
let x = VT::from_be_bytes(a[j].0);
|
||||
let y = VT::from_be_bytes(a[k].0);
|
||||
if x >= y {
|
||||
return Err(Error::with_msg(format!("search in unordered data")));
|
||||
}
|
||||
if y < range.beg {
|
||||
return Ok(None);
|
||||
}
|
||||
@@ -41,11 +38,18 @@ pub fn find_ge(range: NanoRange, expand_right: bool, buf: &[u8]) -> Result<Optio
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
if x >= y {
|
||||
return Err(Error::with_public_msg(format!(
|
||||
"search in unordered data ts1 {x} ts2 {y}"
|
||||
)));
|
||||
}
|
||||
let mut x = x;
|
||||
let mut y = y;
|
||||
loop {
|
||||
if x >= y {
|
||||
return Err(Error::with_msg(format!("search in unordered data")));
|
||||
return Err(Error::with_public_msg(format!(
|
||||
"search (loop) in unordered data ts1 {x} ts2 {y}"
|
||||
)));
|
||||
}
|
||||
if k - j < 2 {
|
||||
if y < range.end || expand_right {
|
||||
@@ -92,9 +96,6 @@ pub fn find_largest_smaller_than(
|
||||
let mut k = n1 - 1;
|
||||
let x = NUM::from_be_bytes(a[j].0);
|
||||
let y = NUM::from_be_bytes(a[k].0);
|
||||
if x >= y {
|
||||
return Err(Error::with_msg(format!("search in unordered data")));
|
||||
}
|
||||
if x >= range.beg {
|
||||
return Ok(None);
|
||||
}
|
||||
@@ -102,11 +103,18 @@ pub fn find_largest_smaller_than(
|
||||
let ret = (y, NUM::from_be_bytes(a[k].1));
|
||||
return Ok(Some(ret));
|
||||
}
|
||||
if x >= y {
|
||||
return Err(Error::with_public_msg(format!(
|
||||
"search in unordered data ts1 {x} ts2 {y}"
|
||||
)));
|
||||
}
|
||||
let mut x = x;
|
||||
let mut y = y;
|
||||
loop {
|
||||
if x >= y {
|
||||
return Err(Error::with_msg(format!("search in unordered data")));
|
||||
return Err(Error::with_public_msg(format!(
|
||||
"search (loop) in unordered data ts1 {x} ts2 {y}"
|
||||
)));
|
||||
}
|
||||
if k - j < 2 {
|
||||
let ret = (x, NUM::from_be_bytes(a[j].1));
|
||||
|
||||
@@ -9,6 +9,7 @@ use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items::numops::{BoolNum, NumOps, StringNum};
|
||||
use items::{EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, StreamItem};
|
||||
use netpod::log::*;
|
||||
use netpod::query::RawEventsQuery;
|
||||
use netpod::{AggKind, ByteOrder, ByteSize, Channel, FileIoBufferSize, NanoRange, NodeConfigCached, ScalarType, Shape};
|
||||
|
||||
@@ -183,6 +184,10 @@ pub async fn make_event_pipe(
|
||||
array: entry.is_array,
|
||||
compression: entry.is_compressed,
|
||||
};
|
||||
trace!(
|
||||
"make_event_pipe need_expand {need_expand} {evq:?}",
|
||||
need_expand = evq.agg_kind.need_expand()
|
||||
);
|
||||
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
|
||||
let event_blobs = EventChunkerMultifile::new(
|
||||
range.clone(),
|
||||
@@ -191,7 +196,7 @@ pub async fn make_event_pipe(
|
||||
node_config.ix,
|
||||
FileIoBufferSize::new(evq.disk_io_buffer_size),
|
||||
event_chunker_conf,
|
||||
true,
|
||||
evq.agg_kind.need_expand(),
|
||||
true,
|
||||
);
|
||||
let shape = entry.to_shape()?;
|
||||
|
||||
Reference in New Issue
Block a user