From dd8d85d5efdf6071a65a9c44c4d672618d307ed2 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 13 Dec 2022 10:38:05 +0100 Subject: [PATCH] Remove unused search bits --- dbconn/src/channelconfig.rs | 1 - dbconn/src/search.rs | 46 +++++++++++++----------------- httpret/src/api4.rs | 1 + httpret/src/api4/search.rs | 56 +++++++++++++++++++++++++++++++++++++ httpret/src/httpret.rs | 13 +++------ httpret/src/search.rs | 26 ----------------- streams/src/tcprawclient.rs | 29 +------------------ 7 files changed, 81 insertions(+), 91 deletions(-) create mode 100644 httpret/src/api4/search.rs delete mode 100644 httpret/src/search.rs diff --git a/dbconn/src/channelconfig.rs b/dbconn/src/channelconfig.rs index 5d5c13d..ab38f7a 100644 --- a/dbconn/src/channelconfig.rs +++ b/dbconn/src/channelconfig.rs @@ -70,7 +70,6 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) -> }; return ret; } - // TODO use a common already running worker pool for these queries: let dbconf = &ncc.node_config.cluster.database; let pgclient = crate::create_connection(dbconf).await?; if let Some(series) = channel.series() { diff --git a/dbconn/src/search.rs b/dbconn/src/search.rs index d24cd90..f8beb78 100644 --- a/dbconn/src/search.rs +++ b/dbconn/src/search.rs @@ -1,11 +1,16 @@ -use crate::{create_connection, ErrConv}; +use crate::create_connection; +use crate::ErrConv; use err::Error; -use netpod::{ - ChannelArchiver, ChannelSearchQuery, ChannelSearchResult, ChannelSearchSingleResult, Database, NodeConfigCached, - ScalarType, ScyllaConfig, Shape, -}; +use netpod::ChannelArchiver; +use netpod::ChannelSearchQuery; +use netpod::ChannelSearchResult; +use netpod::ChannelSearchSingleResult; +use netpod::Database; +use netpod::NodeConfigCached; +use netpod::ScalarType; +use netpod::ScyllaConfig; +use netpod::Shape; use serde_json::Value as JsVal; -use std::sync::Arc; pub async fn search_channel_databuffer( query: ChannelSearchQuery, @@ -85,37 +90,24 @@ pub async fn search_channel_scylla( _scyconf: &ScyllaConfig, pgconf: &Database, ) -> Result { - let empty = if !query.name_regex.is_empty() { - false - } else if !query.source_regex.is_empty() { - false - } else if !query.description_regex.is_empty() { - false - } else { - true - }; + let empty = if !query.name_regex.is_empty() { false } else { true }; if empty { - let ret = ChannelSearchResult { channels: vec![] }; + let ret = ChannelSearchResult { channels: Vec::new() }; return Ok(ret); } let sql = format!(concat!( "select", " series, facility, channel, scalar_type, shape_dims", " from series_by_channel", - " where channel like $1", + " where channel ~* $1", + " limit 100," )); - let u = { - let d = &pgconf; - format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name) - }; - let (pgclient, pgconn) = tokio_postgres::connect(&u, tokio_postgres::NoTls).await.err_conv()?; - // TODO use common connection/pool: - tokio::spawn(pgconn); - let pgclient = Arc::new(pgclient); + let pgclient = crate::create_connection(pgconf).await?; let rows = pgclient.query(sql.as_str(), &[&query.name_regex]).await.err_conv()?; - let mut res = vec![]; + let mut res = Vec::new(); for row in rows { - let series = row.get::<_, i64>(0) as u64; + let series: i64 = row.get(0); + let series = series as u64; let backend: String = row.get(1); let channel: String = row.get(2); let a: i32 = row.get(3); diff --git a/httpret/src/api4.rs b/httpret/src/api4.rs index a3b9424..a321b4d 100644 --- a/httpret/src/api4.rs +++ b/httpret/src/api4.rs @@ -1 +1,2 @@ pub mod binned; +pub mod search; diff --git a/httpret/src/api4/search.rs b/httpret/src/api4/search.rs new file mode 100644 index 0000000..964a888 --- /dev/null +++ b/httpret/src/api4/search.rs @@ -0,0 +1,56 @@ +use crate::bodystream::{response, ToPublicResponse}; +use crate::Error; +use http::StatusCode; +use http::{Method, Request, Response}; +use hyper::Body; +use netpod::log::*; +use netpod::ChannelSearchResult; +use netpod::{ChannelSearchQuery, NodeConfigCached}; +use netpod::{ACCEPT_ALL, APP_JSON}; +use url::Url; + +pub async fn channel_search(req: Request, node_config: &NodeConfigCached) -> Result { + let url = Url::parse(&format!("dummy://{}", req.uri()))?; + let query = ChannelSearchQuery::from_url(&url)?; + info!("search query: {:?}", query); + let res = dbconn::search::search_channel(query, node_config).await?; + Ok(res) +} + +pub struct ChannelSearchHandler {} + +impl ChannelSearchHandler { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/4/search/channel" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() == Method::GET { + let accept_def = APP_JSON; + let accept = req + .headers() + .get(http::header::ACCEPT) + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { + match channel_search(req, node_config).await { + Ok(item) => { + let buf = serde_json::to_vec(&item)?; + Ok(response(StatusCode::OK).body(Body::from(buf))?) + } + Err(e) => { + warn!("ChannelConfigHandler::handle: got error from channel_config: {e:?}"); + Ok(e.to_public_response()) + } + } + } else { + Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) + } + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } +} diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 9816a70..b2bbd7d 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -10,7 +10,6 @@ pub mod gather; pub mod prometheus; pub mod proxy; pub mod pulsemap; -pub mod search; pub mod settings; use self::bodystream::{BodyStream, ToPublicResponse}; @@ -284,12 +283,10 @@ async fn http_service_inner( } } else if let Some(h) = StatusBoardAllHandler::handler(&req) { h.handle(req, &node_config).await - } else if path == "/api/4/search/channel" { - if req.method() == Method::GET { - Ok(search::channel_search(req, &node_config).await?) - } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) - } + } else if let Some(h) = api4::search::ChannelSearchHandler::handler(&req) { + h.handle(req, &node_config).await + } else if let Some(h) = api4::binned::BinnedHandler::handler(&req) { + h.handle(req, &node_config).await } else if let Some(h) = channelconfig::ChannelConfigHandler::handler(&req) { h.handle(req, &node_config).await } else if let Some(h) = channelconfig::ScyllaChannelsWithType::handler(&req) { @@ -308,8 +305,6 @@ async fn http_service_inner( h.handle(req, ctx, &node_config).await } else if let Some(h) = channel_status::ChannelStatusEvents::handler(&req) { h.handle(req, ctx, &node_config).await - } else if let Some(h) = api4::binned::BinnedHandler::handler(&req) { - h.handle(req, &node_config).await } else if path == "/api/4/prebinned" { if req.method() == Method::GET { Ok(prebinned(req, ctx, &node_config).await?) diff --git a/httpret/src/search.rs b/httpret/src/search.rs deleted file mode 100644 index 5392dc8..0000000 --- a/httpret/src/search.rs +++ /dev/null @@ -1,26 +0,0 @@ -use crate::err::Error; -use crate::response; -use http::header; -use hyper::{Body, Request, Response, StatusCode}; -use netpod::log::*; -use netpod::{ChannelSearchQuery, NodeConfigCached, ACCEPT_ALL, APP_JSON}; -use url::Url; - -pub async fn channel_search(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - let (head, _body) = req.into_parts(); - let vdef = header::HeaderValue::from_static(APP_JSON); - let v = head.headers.get(header::ACCEPT).unwrap_or(&vdef); - if v == APP_JSON || v == ACCEPT_ALL { - let s1 = format!("dummy:{}", head.uri); - info!("try to parse {:?}", s1); - let url = Url::parse(&s1)?; - let query = ChannelSearchQuery::from_url(&url)?; - info!("search query: {:?}", query); - let res = dbconn::search::search_channel(query, node_config).await?; - let body = Body::from(serde_json::to_string(&res)?); - let ret = super::response(StatusCode::OK).body(body)?; - Ok(ret) - } else { - Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?) - } -} diff --git a/streams/src/tcprawclient.rs b/streams/src/tcprawclient.rs index e53b70a..ff4fc4a 100644 --- a/streams/src/tcprawclient.rs +++ b/streams/src/tcprawclient.rs @@ -12,7 +12,7 @@ use futures_util::Stream; use items::eventfull::EventFull; use items::frame::{make_frame, make_term_frame}; use items::sitem_data; -use items::{EventQueryJsonStringFrame, EventsNodeProcessor, RangeCompletableItem, Sitemty, StreamItem}; +use items::{EventQueryJsonStringFrame, RangeCompletableItem, Sitemty, StreamItem}; use netpod::log::*; use netpod::query::PlainEventsQuery; use netpod::Cluster; @@ -21,33 +21,6 @@ use std::pin::Pin; use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; -pub async fn x_processed_stream_from_node( - query: PlainEventsQuery, - perf_opts: PerfOpts, - node: Node, -) -> Result::Output>> + Send>>, Error> -where - ENP: EventsNodeProcessor, - ::Output: Unpin + 'static, -{ - debug!("x_processed_stream_from_node to: {}:{}", node.host, node.port_raw); - let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; - let qjs = serde_json::to_string(&query)?; - let (netin, mut netout) = net.into_split(); - let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data( - EventQueryJsonStringFrame(qjs), - ))); - let buf = make_frame(&item)?; - netout.write_all(&buf).await?; - let buf = make_term_frame()?; - netout.write_all(&buf).await?; - netout.flush().await?; - netout.forget(); - let frames = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap); - let items = EventsFromFrames::new(frames); - Ok(Box::pin(items)) -} - pub async fn x_processed_event_blobs_stream_from_node( query: PlainEventsQuery, perf_opts: PerfOpts,