From b52fbd90443295d86c85b0c8482034039f7dbfb3 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 26 Jul 2024 15:16:19 +0200 Subject: [PATCH] Support search for status channel and data fetch --- crates/dbconn/src/search.rs | 2 +- crates/disk/src/decode.rs | 2 -- crates/disk/src/disk.rs | 4 +++- crates/httpret/src/api1.rs | 5 +++-- crates/items_2/src/empty.rs | 4 ---- crates/netpod/src/netpod.rs | 27 +++++++------------------ crates/parse/src/api1_parse.rs | 1 - crates/scyllaconn/src/events2/events.rs | 8 -------- 8 files changed, 14 insertions(+), 39 deletions(-) diff --git a/crates/dbconn/src/search.rs b/crates/dbconn/src/search.rs index f96c133..02e543e 100644 --- a/crates/dbconn/src/search.rs +++ b/crates/dbconn/src/search.rs @@ -109,7 +109,7 @@ pub(super) async fn search_channel_scylla( let ret = ChannelSearchResult { channels: Vec::new() }; return Ok(ret); } - let ch_kind: i16 = if query.channel_status { 1 } else { 2 }; + let ch_kind: i16 = query.kind.to_db_i16(); let (cb1, cb2) = if let Some(x) = query.backend.as_ref() { (false, x.as_str()) } else { diff --git a/crates/disk/src/decode.rs b/crates/disk/src/decode.rs index ca93602..4be8cf8 100644 --- a/crates/disk/src/decode.rs +++ b/crates/disk/src/decode.rs @@ -270,7 +270,6 @@ fn make_scalar_conv( ScalarType::BOOL => ValueDim0FromBytesImpl::::boxed(), ScalarType::STRING => ValueDim0FromBytesImpl::::boxed(), ScalarType::Enum => ValueDim0FromBytesImpl::::boxed(), - ScalarType::ChannelStatus => ValueDim0FromBytesImpl::::boxed(), }, Shape::Wave(_) => { let shape = shape.clone(); @@ -288,7 +287,6 @@ fn make_scalar_conv( ScalarType::BOOL => ValueDim1FromBytesImpl::::boxed(shape), ScalarType::STRING => ValueDim1FromBytesImpl::::boxed(shape), ScalarType::Enum => ValueDim1FromBytesImpl::::boxed(shape), - ScalarType::ChannelStatus => ValueDim1FromBytesImpl::::boxed(shape), } } Shape::Image(_, _) => { diff --git a/crates/disk/src/disk.rs b/crates/disk/src/disk.rs index d7bc14d..4874bda 100644 --- a/crates/disk/src/disk.rs +++ b/crates/disk/src/disk.rs @@ -383,6 +383,7 @@ impl FileContentStream2 { } } + #[allow(unused_mut)] fn make_reading(&mut self) { let mut buf = Box::new(BytesMut::with_capacity(self.disk_io_tune.read_buffer_len)); // let bufref = unsafe { &mut *((&mut buf as &mut BytesMut) as *mut BytesMut) }; @@ -783,7 +784,8 @@ impl BlockingTaskIntoChannel { let item = FileChunkRead::with_buf_dur(buf, ts2.duration_since(ts1)); match tx.send_blocking(Ok(item)) { Ok(()) => (), - Err(e) => { + Err(_) => { + // TODO // Receiver most likely disconnected. // error!("blocking_task_into_channel can not send into channel {e}"); break; diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index 9b7b811..e848f51 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -44,6 +44,7 @@ use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ProxyConfig; use netpod::ReqCtxArc; +use netpod::SeriesKind; use netpod::SfChFetchInfo; use netpod::SfDbChannel; use netpod::Shape; @@ -160,8 +161,8 @@ pub async fn channel_search_list_v1( name_regex: query.regex.map_or(String::new(), |k| k), source_regex: query.source_regex.map_or(String::new(), |k| k), description_regex: query.description_regex.map_or(String::new(), |k| k), - channel_status: false, icase: false, + kind: SeriesKind::default(), }; let urls = proxy_config .backends @@ -271,8 +272,8 @@ pub async fn channel_search_configs_v1( name_regex: query.regex.map_or(String::new(), |k| k), source_regex: query.source_regex.map_or(String::new(), |k| k), description_regex: query.description_regex.map_or(String::new(), |k| k), - channel_status: false, icase: false, + kind: SeriesKind::default(), }; let urls = proxy_config .backends diff --git a/crates/items_2/src/empty.rs b/crates/items_2/src/empty.rs index a1dd2a9..0fa0840 100644 --- a/crates/items_2/src/empty.rs +++ b/crates/items_2/src/empty.rs @@ -27,8 +27,6 @@ pub fn empty_events_dyn_ev(scalar_type: &ScalarType, shape: &Shape) -> Result Box::new(K::::empty()), STRING => Box::new(K::::empty()), Enum => Box::new(K::::empty()), - ChannelStatus => Box::new(K::::empty()), - CaStatus => Box::new(K::::empty()), } } Shape::Wave(..) => { @@ -48,8 +46,6 @@ pub fn empty_events_dyn_ev(scalar_type: &ScalarType, shape: &Shape) -> Result Box::new(K::::empty()), STRING => Box::new(K::::empty()), Enum => Box::new(K::::empty()), - ChannelStatus => Box::new(K::::empty()), - CaStatus => Box::new(K::::empty()), } } Shape::Image(..) => { diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 2c1f28b..e442061 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -187,8 +187,8 @@ impl SeriesKind { pub fn from_db_i16(x: i16) -> Result { let ret = match x { - 1 => Self::ChannelData, - 2 => Self::ChannelStatus, + 1 => Self::ChannelStatus, + 2 => Self::ChannelData, 3 => Self::CaStatus, _ => return Err(Error::with_msg_no_trace("bad SeriesKind value")), }; @@ -249,7 +249,6 @@ pub enum ScalarType { BOOL, STRING, Enum, - ChannelStatus, } impl fmt::Debug for ScalarType { @@ -284,7 +283,6 @@ impl Serialize for ScalarType { BOOL => ser.serialize_str("bool"), STRING => ser.serialize_str("string"), Enum => ser.serialize_str("enum"), - ChannelStatus => ser.serialize_str("channelstatus"), } } } @@ -315,7 +313,6 @@ impl<'de> serde::de::Visitor<'de> for ScalarTypeVis { "bool" => BOOL, "string" => STRING, "enum" => Enum, - "channelstatus" => ChannelStatus, k => return Err(E::custom(format!("can not understand variant {k:?}"))), }; Ok(ret) @@ -352,7 +349,6 @@ impl ScalarType { 11 => F32, 12 => F64, 13 => STRING, - 14 => ChannelStatus, 15 => Enum, 6 => return Err(Error::with_msg(format!("CHARACTER not supported"))), _ => return Err(Error::with_msg(format!("unknown dtype code: {:?}", ix))), @@ -376,7 +372,6 @@ impl ScalarType { BOOL => "bool", STRING => "string", Enum => "enum", - ChannelStatus => "channelstatus", } } @@ -396,7 +391,6 @@ impl ScalarType { "bool" => BOOL, "string" => STRING, "enum" => Enum, - "channelstatus" => ChannelStatus, _ => { return Err(Error::with_msg_no_trace(format!( "from_bsread_str can not understand bsread {:?}", @@ -423,7 +417,6 @@ impl ScalarType { BOOL => "bool", STRING => "string", Enum => "enum", - ChannelStatus => "channelstatus", } } @@ -445,7 +438,6 @@ impl ScalarType { "bool" => BOOL, "string" => STRING, "enum" => Enum, - "channelstatus" => ChannelStatus, _ => { return Err(Error::with_msg_no_trace(format!( "from_bsread_str can not understand bsread {:?}", @@ -534,7 +526,6 @@ impl ScalarType { BOOL => 1, STRING => 1, Enum => 2, - ChannelStatus => 4, } } @@ -553,7 +544,6 @@ impl ScalarType { F64 => 12, BOOL => 0, STRING => 13, - ChannelStatus => 14, Enum => 15, } } @@ -2999,9 +2989,9 @@ pub struct ChannelSearchQuery { pub source_regex: String, pub description_regex: String, #[serde(default)] - pub channel_status: bool, - #[serde(default)] pub icase: bool, + #[serde(default)] + pub kind: SeriesKind, } impl ChannelSearchQuery { @@ -3012,12 +3002,8 @@ impl ChannelSearchQuery { name_regex: pairs.get("nameRegex").map_or(String::new(), |k| k.clone()), source_regex: pairs.get("sourceRegex").map_or(String::new(), |k| k.clone()), description_regex: pairs.get("descriptionRegex").map_or(String::new(), |k| k.clone()), - channel_status: pairs - .get("channelStatus") - .map(|k| k.parse().ok()) - .unwrap_or(None) - .unwrap_or(false), icase: pairs.get("icase").map_or(None, |x| x.parse().ok()).unwrap_or(false), + kind: SeriesKind::from_pairs(&pairs)?, }; Ok(ret) } @@ -3030,8 +3016,9 @@ impl ChannelSearchQuery { qp.append_pair("nameRegex", &self.name_regex); qp.append_pair("sourceRegex", &self.source_regex); qp.append_pair("descriptionRegex", &self.description_regex); - qp.append_pair("channelStatus", &self.channel_status.to_string()); qp.append_pair("icase", &self.icase.to_string()); + drop(qp); + self.kind.append_to_url(url); } } diff --git a/crates/parse/src/api1_parse.rs b/crates/parse/src/api1_parse.rs index 9e4b78f..cd4c2e3 100644 --- a/crates/parse/src/api1_parse.rs +++ b/crates/parse/src/api1_parse.rs @@ -112,7 +112,6 @@ impl From<&ScalarType> for Api1ScalarType { A::STRING => B::STRING, // TODO treat enum as number only A::Enum => B::U16, - A::ChannelStatus => todo!("ChannelStatus not in Api1ScalarType"), } } } diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index 979295e..df45863 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -174,10 +174,6 @@ impl EventsStreamRt { ); read_next_values::(opts).await } - ScalarType::ChannelStatus => { - warn!("read not yet supported {:?} {:?}", shape, scalar_type); - err::todoval() - } }, Shape::Wave(_) => match &scalar_type { ScalarType::U8 => read_next_values::>(opts).await, @@ -199,10 +195,6 @@ impl EventsStreamRt { warn!("read not yet supported {:?} {:?}", shape, scalar_type); err::todoval() } - ScalarType::ChannelStatus => { - warn!("read not yet supported {:?} {:?}", shape, scalar_type); - err::todoval() - } }, _ => { error!("TODO ReadValues add more types");