From 264c4307b9a8b35267d5e1b358855794433fe5c5 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 16 Feb 2022 12:50:34 +0100 Subject: [PATCH] Constrain channel search, add better error message --- dbconn/src/search.rs | 28 ++++++++++++++++++++++++++++ disk/src/dataopen.rs | 34 ++++++++++++++++++++-------------- disk/src/index.rs | 24 ++++++++++++++++-------- disk/src/raw/conn.rs | 7 ++++++- err/src/lib.rs | 2 +- httpret/src/err.rs | 7 ++++++- httpret/src/httpret.rs | 6 +++--- 7 files changed, 80 insertions(+), 28 deletions(-) diff --git a/dbconn/src/search.rs b/dbconn/src/search.rs index 798e3c4..9dbf266 100644 --- a/dbconn/src/search.rs +++ b/dbconn/src/search.rs @@ -7,6 +7,19 @@ pub async fn search_channel_databuffer( query: ChannelSearchQuery, node_config: &NodeConfigCached, ) -> Result { + let do_search = if !query.name_regex.is_empty() { + true + } else if !query.source_regex.is_empty() { + true + } else if query.description_regex.is_empty() { + true + } else { + false + }; + if !do_search { + let ret = ChannelSearchResult { channels: vec![] }; + return Ok(ret); + } let sql = format!(concat!( "select ", "channel_id, channel_name, source_name, dtype, shape, unit, description, channel_backend", @@ -67,6 +80,21 @@ pub async fn search_channel_archeng( backend: String, conf: &ChannelArchiver, ) -> Result { + // Channel archiver provides only channel name. Also, search criteria are currently ANDed. + // Therefore search only if user only provides a name criterion. + let empty = if !query.source_regex.is_empty() { + true + } else if !query.description_regex.is_empty() { + true + } else if query.name_regex.is_empty() { + true + } else { + false + }; + if empty { + let ret = ChannelSearchResult { channels: vec![] }; + return Ok(ret); + } let sql = format!(concat!( "select c.name, c.config", " from channels c", diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index 80100ea..d5be0ac 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -30,13 +30,7 @@ async fn position_file( expand_left: bool, expand_right: bool, ) -> Result { - trace!( - "position_file called {} {} {:?} {:?}", - expand_left, - expand_right, - range, - path - ); + trace!("position_file called expand_left {expand_left} expand_right {expand_right} {range:?} {path:?}"); assert_eq!(expand_left && expand_right, false); match OpenOptions::new().read(true).open(&path).await { Ok(file) => { @@ -73,9 +67,16 @@ async fn position_file( buf.resize(buf.capacity(), 0); index_file.read_exact(&mut buf).await?; let gg = if expand_left { - super::index::find_largest_smaller_than(range.clone(), expand_right, &buf[2..])? + super::index::find_largest_smaller_than(range.clone(), expand_right, &buf[2..]) } else { - super::index::find_ge(range.clone(), expand_right, &buf[2..])? + super::index::find_ge(range.clone(), expand_right, &buf[2..]) + }; + let gg = match gg { + Ok(x) => x, + Err(e) => { + error!("can not position file for range {range:?} expand_right {expand_right:?} buflen {buflen}", buflen = buf.len()); + return Err(e); + } }; match gg { Some(o) => { @@ -207,12 +208,17 @@ pub fn open_files( tokio::spawn(async move { match open_files_inner(&chtx, &range, &channel_config, node).await { Ok(_) => {} - Err(e) => match chtx.send(Err(e.into())).await { - Ok(_) => {} - Err(e) => { - error!("open_files channel send error {:?}", e); + Err(e) => { + let e = e.add_public_msg(format!( + "Can not open file for channel: {channel_config:?} range: {range:?}" + )); + match chtx.send(Err(e.into())).await { + Ok(_) => {} + Err(e) => { + error!("open_files channel send error {:?}", e); + } } - }, + } } }); chrx diff --git a/disk/src/index.rs b/disk/src/index.rs index a5021ed..04c555b 100644 --- a/disk/src/index.rs +++ b/disk/src/index.rs @@ -28,9 +28,6 @@ pub fn find_ge(range: NanoRange, expand_right: bool, buf: &[u8]) -> Result= y { - return Err(Error::with_msg(format!("search in unordered data"))); - } if y < range.beg { return Ok(None); } @@ -41,11 +38,18 @@ pub fn find_ge(range: NanoRange, expand_right: bool, buf: &[u8]) -> Result= y { + return Err(Error::with_public_msg(format!( + "search in unordered data ts1 {x} ts2 {y}" + ))); + } let mut x = x; let mut y = y; loop { if x >= y { - return Err(Error::with_msg(format!("search in unordered data"))); + return Err(Error::with_public_msg(format!( + "search (loop) in unordered data ts1 {x} ts2 {y}" + ))); } if k - j < 2 { if y < range.end || expand_right { @@ -92,9 +96,6 @@ pub fn find_largest_smaller_than( let mut k = n1 - 1; let x = NUM::from_be_bytes(a[j].0); let y = NUM::from_be_bytes(a[k].0); - if x >= y { - return Err(Error::with_msg(format!("search in unordered data"))); - } if x >= range.beg { return Ok(None); } @@ -102,11 +103,18 @@ pub fn find_largest_smaller_than( let ret = (y, NUM::from_be_bytes(a[k].1)); return Ok(Some(ret)); } + if x >= y { + return Err(Error::with_public_msg(format!( + "search in unordered data ts1 {x} ts2 {y}" + ))); + } let mut x = x; let mut y = y; loop { if x >= y { - return Err(Error::with_msg(format!("search in unordered data"))); + return Err(Error::with_public_msg(format!( + "search (loop) in unordered data ts1 {x} ts2 {y}" + ))); } if k - j < 2 { let ret = (x, NUM::from_be_bytes(a[j].1)); diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index b67f920..98b820b 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -9,6 +9,7 @@ use futures_core::Stream; use futures_util::StreamExt; use items::numops::{BoolNum, NumOps, StringNum}; use items::{EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, StreamItem}; +use netpod::log::*; use netpod::query::RawEventsQuery; use netpod::{AggKind, ByteOrder, ByteSize, Channel, FileIoBufferSize, NanoRange, NodeConfigCached, ScalarType, Shape}; @@ -183,6 +184,10 @@ pub async fn make_event_pipe( array: entry.is_array, compression: entry.is_compressed, }; + trace!( + "make_event_pipe need_expand {need_expand} {evq:?}", + need_expand = evq.agg_kind.need_expand() + ); let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); let event_blobs = EventChunkerMultifile::new( range.clone(), @@ -191,7 +196,7 @@ pub async fn make_event_pipe( node_config.ix, FileIoBufferSize::new(evq.disk_io_buffer_size), event_chunker_conf, - true, + evq.agg_kind.need_expand(), true, ); let shape = entry.to_shape()?; diff --git a/err/src/lib.rs b/err/src/lib.rs index 3920986..5b290bc 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -108,7 +108,7 @@ fn fmt_backtrace(trace: &backtrace::Backtrace) -> String { None => false, Some(s) => { let s = s.to_str().unwrap(); - true || s.contains("/dev/daqbuffer/") || s.contains("/data_meta/build/") + true || s.contains("/dev/daqbuffer/") || s.contains("/build/daqbuffer/") } }; let name = match sy.name() { diff --git a/httpret/src/err.rs b/httpret/src/err.rs index 34f1b09..b8b726e 100644 --- a/httpret/src/err.rs +++ b/httpret/src/err.rs @@ -1,6 +1,5 @@ use std::fmt; -#[derive(Debug)] pub struct Error(::err::Error); impl Error { @@ -33,6 +32,12 @@ impl Error { } } +impl fmt::Debug for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.0, fmt) + } +} + impl fmt::Display for Error { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt::Display::fmt(&self.0, fmt) diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 5362c91..ae0cf9a 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -426,15 +426,15 @@ where Ok(r) => match r { Ready(Some(Ok(k))) => Ready(Some(Ok(k))), Ready(Some(Err(e))) => { - error!("body stream error: {:?}", e); + error!("body stream error: {e:?}"); Ready(Some(Err(Error::from(e)))) } Ready(None) => Ready(None), Pending => Pending, }, Err(e) => { - error!("panic caught in httpret::BodyStream: {:?}", e); - let e = Error::with_msg(format!("panic caught in httpret::BodyStream: {:?}", e)); + error!("panic caught in httpret::BodyStream: {e:?}"); + let e = Error::with_msg(format!("panic caught in httpret::BodyStream: {e:?}")); Ready(Some(Err(e))) } }