Remove unused search bits
This commit is contained in:
@@ -70,7 +70,6 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) ->
|
|||||||
};
|
};
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
// TODO use a common already running worker pool for these queries:
|
|
||||||
let dbconf = &ncc.node_config.cluster.database;
|
let dbconf = &ncc.node_config.cluster.database;
|
||||||
let pgclient = crate::create_connection(dbconf).await?;
|
let pgclient = crate::create_connection(dbconf).await?;
|
||||||
if let Some(series) = channel.series() {
|
if let Some(series) = channel.series() {
|
||||||
|
|||||||
+19
-27
@@ -1,11 +1,16 @@
|
|||||||
use crate::{create_connection, ErrConv};
|
use crate::create_connection;
|
||||||
|
use crate::ErrConv;
|
||||||
use err::Error;
|
use err::Error;
|
||||||
use netpod::{
|
use netpod::ChannelArchiver;
|
||||||
ChannelArchiver, ChannelSearchQuery, ChannelSearchResult, ChannelSearchSingleResult, Database, NodeConfigCached,
|
use netpod::ChannelSearchQuery;
|
||||||
ScalarType, ScyllaConfig, Shape,
|
use netpod::ChannelSearchResult;
|
||||||
};
|
use netpod::ChannelSearchSingleResult;
|
||||||
|
use netpod::Database;
|
||||||
|
use netpod::NodeConfigCached;
|
||||||
|
use netpod::ScalarType;
|
||||||
|
use netpod::ScyllaConfig;
|
||||||
|
use netpod::Shape;
|
||||||
use serde_json::Value as JsVal;
|
use serde_json::Value as JsVal;
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
pub async fn search_channel_databuffer(
|
pub async fn search_channel_databuffer(
|
||||||
query: ChannelSearchQuery,
|
query: ChannelSearchQuery,
|
||||||
@@ -85,37 +90,24 @@ pub async fn search_channel_scylla(
|
|||||||
_scyconf: &ScyllaConfig,
|
_scyconf: &ScyllaConfig,
|
||||||
pgconf: &Database,
|
pgconf: &Database,
|
||||||
) -> Result<ChannelSearchResult, Error> {
|
) -> Result<ChannelSearchResult, Error> {
|
||||||
let empty = if !query.name_regex.is_empty() {
|
let empty = if !query.name_regex.is_empty() { false } else { true };
|
||||||
false
|
|
||||||
} else if !query.source_regex.is_empty() {
|
|
||||||
false
|
|
||||||
} else if !query.description_regex.is_empty() {
|
|
||||||
false
|
|
||||||
} else {
|
|
||||||
true
|
|
||||||
};
|
|
||||||
if empty {
|
if empty {
|
||||||
let ret = ChannelSearchResult { channels: vec![] };
|
let ret = ChannelSearchResult { channels: Vec::new() };
|
||||||
return Ok(ret);
|
return Ok(ret);
|
||||||
}
|
}
|
||||||
let sql = format!(concat!(
|
let sql = format!(concat!(
|
||||||
"select",
|
"select",
|
||||||
" series, facility, channel, scalar_type, shape_dims",
|
" series, facility, channel, scalar_type, shape_dims",
|
||||||
" from series_by_channel",
|
" from series_by_channel",
|
||||||
" where channel like $1",
|
" where channel ~* $1",
|
||||||
|
" limit 100,"
|
||||||
));
|
));
|
||||||
let u = {
|
let pgclient = crate::create_connection(pgconf).await?;
|
||||||
let d = &pgconf;
|
|
||||||
format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name)
|
|
||||||
};
|
|
||||||
let (pgclient, pgconn) = tokio_postgres::connect(&u, tokio_postgres::NoTls).await.err_conv()?;
|
|
||||||
// TODO use common connection/pool:
|
|
||||||
tokio::spawn(pgconn);
|
|
||||||
let pgclient = Arc::new(pgclient);
|
|
||||||
let rows = pgclient.query(sql.as_str(), &[&query.name_regex]).await.err_conv()?;
|
let rows = pgclient.query(sql.as_str(), &[&query.name_regex]).await.err_conv()?;
|
||||||
let mut res = vec![];
|
let mut res = Vec::new();
|
||||||
for row in rows {
|
for row in rows {
|
||||||
let series = row.get::<_, i64>(0) as u64;
|
let series: i64 = row.get(0);
|
||||||
|
let series = series as u64;
|
||||||
let backend: String = row.get(1);
|
let backend: String = row.get(1);
|
||||||
let channel: String = row.get(2);
|
let channel: String = row.get(2);
|
||||||
let a: i32 = row.get(3);
|
let a: i32 = row.get(3);
|
||||||
|
|||||||
@@ -1 +1,2 @@
|
|||||||
pub mod binned;
|
pub mod binned;
|
||||||
|
pub mod search;
|
||||||
|
|||||||
@@ -0,0 +1,56 @@
|
|||||||
|
use crate::bodystream::{response, ToPublicResponse};
|
||||||
|
use crate::Error;
|
||||||
|
use http::StatusCode;
|
||||||
|
use http::{Method, Request, Response};
|
||||||
|
use hyper::Body;
|
||||||
|
use netpod::log::*;
|
||||||
|
use netpod::ChannelSearchResult;
|
||||||
|
use netpod::{ChannelSearchQuery, NodeConfigCached};
|
||||||
|
use netpod::{ACCEPT_ALL, APP_JSON};
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
pub async fn channel_search(req: Request<Body>, node_config: &NodeConfigCached) -> Result<ChannelSearchResult, Error> {
|
||||||
|
let url = Url::parse(&format!("dummy://{}", req.uri()))?;
|
||||||
|
let query = ChannelSearchQuery::from_url(&url)?;
|
||||||
|
info!("search query: {:?}", query);
|
||||||
|
let res = dbconn::search::search_channel(query, node_config).await?;
|
||||||
|
Ok(res)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ChannelSearchHandler {}
|
||||||
|
|
||||||
|
impl ChannelSearchHandler {
|
||||||
|
pub fn handler(req: &Request<Body>) -> Option<Self> {
|
||||||
|
if req.uri().path() == "/api/4/search/channel" {
|
||||||
|
Some(Self {})
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||||
|
if req.method() == Method::GET {
|
||||||
|
let accept_def = APP_JSON;
|
||||||
|
let accept = req
|
||||||
|
.headers()
|
||||||
|
.get(http::header::ACCEPT)
|
||||||
|
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
|
||||||
|
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
|
||||||
|
match channel_search(req, node_config).await {
|
||||||
|
Ok(item) => {
|
||||||
|
let buf = serde_json::to_vec(&item)?;
|
||||||
|
Ok(response(StatusCode::OK).body(Body::from(buf))?)
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("ChannelConfigHandler::handle: got error from channel_config: {e:?}");
|
||||||
|
Ok(e.to_public_response())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -10,7 +10,6 @@ pub mod gather;
|
|||||||
pub mod prometheus;
|
pub mod prometheus;
|
||||||
pub mod proxy;
|
pub mod proxy;
|
||||||
pub mod pulsemap;
|
pub mod pulsemap;
|
||||||
pub mod search;
|
|
||||||
pub mod settings;
|
pub mod settings;
|
||||||
|
|
||||||
use self::bodystream::{BodyStream, ToPublicResponse};
|
use self::bodystream::{BodyStream, ToPublicResponse};
|
||||||
@@ -284,12 +283,10 @@ async fn http_service_inner(
|
|||||||
}
|
}
|
||||||
} else if let Some(h) = StatusBoardAllHandler::handler(&req) {
|
} else if let Some(h) = StatusBoardAllHandler::handler(&req) {
|
||||||
h.handle(req, &node_config).await
|
h.handle(req, &node_config).await
|
||||||
} else if path == "/api/4/search/channel" {
|
} else if let Some(h) = api4::search::ChannelSearchHandler::handler(&req) {
|
||||||
if req.method() == Method::GET {
|
h.handle(req, &node_config).await
|
||||||
Ok(search::channel_search(req, &node_config).await?)
|
} else if let Some(h) = api4::binned::BinnedHandler::handler(&req) {
|
||||||
} else {
|
h.handle(req, &node_config).await
|
||||||
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
|
|
||||||
}
|
|
||||||
} else if let Some(h) = channelconfig::ChannelConfigHandler::handler(&req) {
|
} else if let Some(h) = channelconfig::ChannelConfigHandler::handler(&req) {
|
||||||
h.handle(req, &node_config).await
|
h.handle(req, &node_config).await
|
||||||
} else if let Some(h) = channelconfig::ScyllaChannelsWithType::handler(&req) {
|
} else if let Some(h) = channelconfig::ScyllaChannelsWithType::handler(&req) {
|
||||||
@@ -308,8 +305,6 @@ async fn http_service_inner(
|
|||||||
h.handle(req, ctx, &node_config).await
|
h.handle(req, ctx, &node_config).await
|
||||||
} else if let Some(h) = channel_status::ChannelStatusEvents::handler(&req) {
|
} else if let Some(h) = channel_status::ChannelStatusEvents::handler(&req) {
|
||||||
h.handle(req, ctx, &node_config).await
|
h.handle(req, ctx, &node_config).await
|
||||||
} else if let Some(h) = api4::binned::BinnedHandler::handler(&req) {
|
|
||||||
h.handle(req, &node_config).await
|
|
||||||
} else if path == "/api/4/prebinned" {
|
} else if path == "/api/4/prebinned" {
|
||||||
if req.method() == Method::GET {
|
if req.method() == Method::GET {
|
||||||
Ok(prebinned(req, ctx, &node_config).await?)
|
Ok(prebinned(req, ctx, &node_config).await?)
|
||||||
|
|||||||
@@ -1,26 +0,0 @@
|
|||||||
use crate::err::Error;
|
|
||||||
use crate::response;
|
|
||||||
use http::header;
|
|
||||||
use hyper::{Body, Request, Response, StatusCode};
|
|
||||||
use netpod::log::*;
|
|
||||||
use netpod::{ChannelSearchQuery, NodeConfigCached, ACCEPT_ALL, APP_JSON};
|
|
||||||
use url::Url;
|
|
||||||
|
|
||||||
pub async fn channel_search(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
|
||||||
let (head, _body) = req.into_parts();
|
|
||||||
let vdef = header::HeaderValue::from_static(APP_JSON);
|
|
||||||
let v = head.headers.get(header::ACCEPT).unwrap_or(&vdef);
|
|
||||||
if v == APP_JSON || v == ACCEPT_ALL {
|
|
||||||
let s1 = format!("dummy:{}", head.uri);
|
|
||||||
info!("try to parse {:?}", s1);
|
|
||||||
let url = Url::parse(&s1)?;
|
|
||||||
let query = ChannelSearchQuery::from_url(&url)?;
|
|
||||||
info!("search query: {:?}", query);
|
|
||||||
let res = dbconn::search::search_channel(query, node_config).await?;
|
|
||||||
let body = Body::from(serde_json::to_string(&res)?);
|
|
||||||
let ret = super::response(StatusCode::OK).body(body)?;
|
|
||||||
Ok(ret)
|
|
||||||
} else {
|
|
||||||
Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -12,7 +12,7 @@ use futures_util::Stream;
|
|||||||
use items::eventfull::EventFull;
|
use items::eventfull::EventFull;
|
||||||
use items::frame::{make_frame, make_term_frame};
|
use items::frame::{make_frame, make_term_frame};
|
||||||
use items::sitem_data;
|
use items::sitem_data;
|
||||||
use items::{EventQueryJsonStringFrame, EventsNodeProcessor, RangeCompletableItem, Sitemty, StreamItem};
|
use items::{EventQueryJsonStringFrame, RangeCompletableItem, Sitemty, StreamItem};
|
||||||
use netpod::log::*;
|
use netpod::log::*;
|
||||||
use netpod::query::PlainEventsQuery;
|
use netpod::query::PlainEventsQuery;
|
||||||
use netpod::Cluster;
|
use netpod::Cluster;
|
||||||
@@ -21,33 +21,6 @@ use std::pin::Pin;
|
|||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
|
|
||||||
pub async fn x_processed_stream_from_node<ENP>(
|
|
||||||
query: PlainEventsQuery,
|
|
||||||
perf_opts: PerfOpts,
|
|
||||||
node: Node,
|
|
||||||
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<<ENP as EventsNodeProcessor>::Output>> + Send>>, Error>
|
|
||||||
where
|
|
||||||
ENP: EventsNodeProcessor,
|
|
||||||
<ENP as EventsNodeProcessor>::Output: Unpin + 'static,
|
|
||||||
{
|
|
||||||
debug!("x_processed_stream_from_node to: {}:{}", node.host, node.port_raw);
|
|
||||||
let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?;
|
|
||||||
let qjs = serde_json::to_string(&query)?;
|
|
||||||
let (netin, mut netout) = net.into_split();
|
|
||||||
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(
|
|
||||||
EventQueryJsonStringFrame(qjs),
|
|
||||||
)));
|
|
||||||
let buf = make_frame(&item)?;
|
|
||||||
netout.write_all(&buf).await?;
|
|
||||||
let buf = make_term_frame()?;
|
|
||||||
netout.write_all(&buf).await?;
|
|
||||||
netout.flush().await?;
|
|
||||||
netout.forget();
|
|
||||||
let frames = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap);
|
|
||||||
let items = EventsFromFrames::new(frames);
|
|
||||||
Ok(Box::pin(items))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn x_processed_event_blobs_stream_from_node(
|
pub async fn x_processed_event_blobs_stream_from_node(
|
||||||
query: PlainEventsQuery,
|
query: PlainEventsQuery,
|
||||||
perf_opts: PerfOpts,
|
perf_opts: PerfOpts,
|
||||||
|
|||||||
Reference in New Issue
Block a user