Remove unused
This commit is contained in:
@@ -1,8 +1,10 @@
|
||||
use crate::bodystream::response;
|
||||
use crate::err::Error;
|
||||
use crate::ReqCtx;
|
||||
use futures_util::StreamExt;
|
||||
use http::{Method, Request, Response, StatusCode};
|
||||
use hyper::Body;
|
||||
use items_2::channelevents::ConnStatusEvent;
|
||||
use netpod::query::ChannelStateEventsQuery;
|
||||
use netpod::{FromUrl, NodeConfigCached, ACCEPT_ALL, APP_JSON};
|
||||
use url::Url;
|
||||
@@ -53,23 +55,29 @@ impl ConnectionStatusEvents {
|
||||
&self,
|
||||
q: &ChannelStateEventsQuery,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<Vec<(u64, u32)>, Error> {
|
||||
) -> Result<Vec<ConnStatusEvent>, Error> {
|
||||
let scyco = node_config
|
||||
.node_config
|
||||
.cluster
|
||||
.scylla
|
||||
.as_ref()
|
||||
.ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?;
|
||||
let ret = dbconn::events_scylla::channel_state_events(q, scyco).await?;
|
||||
let scy = scyllaconn::create_scy_session(scyco).await?;
|
||||
let mut stream = scyllaconn::events::channel_state_events(q, scy).await?;
|
||||
let mut ret = Vec::new();
|
||||
while let Some(item) = stream.next().await {
|
||||
let item = item?;
|
||||
ret.push(item);
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ChannelConnectionStatusEvents {}
|
||||
pub struct ChannelStatusEvents {}
|
||||
|
||||
impl ChannelConnectionStatusEvents {
|
||||
impl ChannelStatusEvents {
|
||||
pub fn handler(req: &Request<Body>) -> Option<Self> {
|
||||
if req.uri().path() == "/api/4/scylla/channel/connection/status/events" {
|
||||
if req.uri().path() == "/api/4/scylla/channel/status/events" {
|
||||
Some(Self {})
|
||||
} else {
|
||||
None
|
||||
@@ -111,14 +119,20 @@ impl ChannelConnectionStatusEvents {
|
||||
&self,
|
||||
q: &ChannelStateEventsQuery,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<Vec<(u64, u32)>, Error> {
|
||||
) -> Result<Vec<ConnStatusEvent>, Error> {
|
||||
let scyco = node_config
|
||||
.node_config
|
||||
.cluster
|
||||
.scylla
|
||||
.as_ref()
|
||||
.ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?;
|
||||
let ret = dbconn::events_scylla::channel_state_events(q, scyco).await?;
|
||||
let scy = scyllaconn::create_scy_session(scyco).await?;
|
||||
let mut stream = scyllaconn::events::channel_state_events(q, scy).await?;
|
||||
let mut ret = Vec::new();
|
||||
while let Some(item) = stream.next().await {
|
||||
let item = item?;
|
||||
ret.push(item);
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,15 +1,17 @@
|
||||
use crate::err::Error;
|
||||
use crate::{response, ToPublicResponse};
|
||||
use dbconn::channelconfig::{chconf_from_database, ChConf};
|
||||
use dbconn::{create_connection, create_scylla_connection};
|
||||
use dbconn::channelconfig::chconf_from_database;
|
||||
use dbconn::channelconfig::ChConf;
|
||||
use dbconn::create_connection;
|
||||
use futures_util::StreamExt;
|
||||
use http::{Method, Request, Response, StatusCode};
|
||||
use hyper::Body;
|
||||
use netpod::get_url_query_pairs;
|
||||
use netpod::log::*;
|
||||
use netpod::query::prebinned::PreBinnedQuery;
|
||||
use netpod::query::{BinnedQuery, PlainEventsQuery};
|
||||
use netpod::timeunits::*;
|
||||
use netpod::{get_url_query_pairs, Channel, ChannelConfigQuery, Database, FromUrl, ScalarType, ScyllaConfig, Shape};
|
||||
use netpod::{Channel, ChannelConfigQuery, Database, FromUrl, ScalarType, ScyllaConfig, Shape};
|
||||
use netpod::{ChannelConfigResponse, NodeConfigCached};
|
||||
use netpod::{ACCEPT_ALL, APP_JSON};
|
||||
use scylla::batch::Consistency;
|
||||
@@ -221,7 +223,9 @@ impl ScyllaConfigsHisto {
|
||||
.get(http::header::ACCEPT)
|
||||
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
|
||||
if accept == APP_JSON || accept == ACCEPT_ALL {
|
||||
let res = self.make_histo(node_config).await?;
|
||||
let res = self
|
||||
.make_histo(&node_config.node_config.cluster.backend, node_config)
|
||||
.await?;
|
||||
let body = Body::from(serde_json::to_vec(&res)?);
|
||||
Ok(response(StatusCode::OK).body(body)?)
|
||||
} else {
|
||||
@@ -232,7 +236,7 @@ impl ScyllaConfigsHisto {
|
||||
}
|
||||
}
|
||||
|
||||
async fn make_histo(&self, node_config: &NodeConfigCached) -> Result<ConfigsHisto, Error> {
|
||||
async fn make_histo(&self, backend: &str, node_config: &NodeConfigCached) -> Result<ConfigsHisto, Error> {
|
||||
let scyco = node_config
|
||||
.node_config
|
||||
.cluster
|
||||
@@ -246,7 +250,6 @@ impl ScyllaConfigsHisto {
|
||||
.build()
|
||||
.await
|
||||
.err_conv()?;
|
||||
let backend = "scylla";
|
||||
let res = scy
|
||||
.query(
|
||||
"select scalar_type, shape_dims, series from series_by_channel where facility = ? allow filtering",
|
||||
@@ -336,7 +339,9 @@ impl ScyllaChannelsWithType {
|
||||
if accept == APP_JSON || accept == ACCEPT_ALL {
|
||||
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
|
||||
let q = ChannelsWithTypeQuery::from_url(&url)?;
|
||||
let res = self.get_channels(&q, node_config).await?;
|
||||
let res = self
|
||||
.get_channels(&q, &node_config.node_config.cluster.backend, node_config)
|
||||
.await?;
|
||||
let body = Body::from(serde_json::to_vec(&res)?);
|
||||
Ok(response(StatusCode::OK).body(body)?)
|
||||
} else {
|
||||
@@ -350,6 +355,7 @@ impl ScyllaChannelsWithType {
|
||||
async fn get_channels(
|
||||
&self,
|
||||
q: &ChannelsWithTypeQuery,
|
||||
backend: &str,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<ChannelListWithType, Error> {
|
||||
let scyco = node_config
|
||||
@@ -365,7 +371,6 @@ impl ScyllaChannelsWithType {
|
||||
.build()
|
||||
.await
|
||||
.err_conv()?;
|
||||
let backend = "scylla";
|
||||
let res = scy
|
||||
.query(
|
||||
"select channel_name, series from series_by_channel where facility = ? and scalar_type = ? and shape_dims = ? allow filtering",
|
||||
@@ -1160,7 +1165,7 @@ impl GenerateScyllaTestData {
|
||||
let dbconf = &node_config.node_config.cluster.database;
|
||||
let _pg_client = create_connection(dbconf).await?;
|
||||
let scyconf = node_config.node_config.cluster.scylla.as_ref().unwrap();
|
||||
let scy = create_scylla_connection(scyconf).await?;
|
||||
let scy = scyllaconn::create_scy_session(scyconf).await?;
|
||||
let series: u64 = 42001;
|
||||
// TODO query `ts_msp` for all MSP values und use that to delete from event table first.
|
||||
// Only later delete also from the `ts_msp` table.
|
||||
|
||||
@@ -1,22 +1,14 @@
|
||||
use crate::channelconfig::{chconf_from_events_binary, chconf_from_events_json};
|
||||
use crate::err::Error;
|
||||
use crate::{response, response_err, BodyStream, ReqCtx, ToPublicResponse};
|
||||
use futures_util::{stream, Stream, StreamExt, TryStreamExt};
|
||||
use crate::{response, response_err, BodyStream, ToPublicResponse};
|
||||
use futures_util::{stream, TryStreamExt};
|
||||
use http::{Method, Request, Response, StatusCode};
|
||||
use hyper::Body;
|
||||
use items_2::channelevents::ChannelEvents;
|
||||
use items_2::merger_cev::ChannelEventsMerger;
|
||||
use items_2::{binned_collected, empty_events_dyn, empty_events_dyn_2};
|
||||
use netpod::log::*;
|
||||
use netpod::query::{BinnedQuery, ChannelStateEventsQuery, PlainEventsQuery};
|
||||
use netpod::{AggKind, BinnedRange, FromUrl, NodeConfigCached};
|
||||
use netpod::query::PlainEventsQuery;
|
||||
use netpod::FromUrl;
|
||||
use netpod::NodeConfigCached;
|
||||
use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET};
|
||||
use scyllaconn::create_scy_session;
|
||||
use scyllaconn::errconv::ErrConv;
|
||||
use scyllaconn::events::{channel_state_events, make_scylla_stream};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use url::Url;
|
||||
|
||||
pub struct EventsHandler {}
|
||||
@@ -116,306 +108,3 @@ async fn plain_events_json(
|
||||
let ret = response(StatusCode::OK).body(Body::from(buf))?;
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub struct EventsHandlerScylla {}
|
||||
|
||||
impl EventsHandlerScylla {
|
||||
pub fn handler(req: &Request<Body>) -> Option<Self> {
|
||||
if req.uri().path() == "/api/4/scylla/events" {
|
||||
Some(Self {})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle(
|
||||
&self,
|
||||
req: Request<Body>,
|
||||
ctx: &ReqCtx,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
if req.method() != Method::GET {
|
||||
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
|
||||
}
|
||||
match self.fetch(req, ctx, node_config).await {
|
||||
Ok(ret) => Ok(ret),
|
||||
Err(e) => Ok(e.to_public_response()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch(
|
||||
&self,
|
||||
req: Request<Body>,
|
||||
ctx: &ReqCtx,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
info!("EventsHandlerScylla req: {:?}", req);
|
||||
let accept_def = APP_JSON;
|
||||
let accept = req
|
||||
.headers()
|
||||
.get(http::header::ACCEPT)
|
||||
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
|
||||
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
|
||||
Ok(self.gather(req, ctx, node_config).await?)
|
||||
} else {
|
||||
let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("Unsupported Accept: {:?}", accept))?;
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
async fn gather(
|
||||
&self,
|
||||
req: Request<Body>,
|
||||
_ctx: &ReqCtx,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
let self_name = std::any::type_name::<Self>();
|
||||
let (head, _body) = req.into_parts();
|
||||
warn!("TODO PlainEventsQuery needs to take AggKind to do x-binning");
|
||||
let s1 = format!("dummy:{}", head.uri);
|
||||
let url = Url::parse(&s1)?;
|
||||
let evq = PlainEventsQuery::from_url(&url)?;
|
||||
let deadline = Instant::now() + evq.timeout();
|
||||
let pgclient = {
|
||||
// TODO use common connection/pool:
|
||||
info!("--------------- open postgres connection");
|
||||
let pgconf = &node_config.node_config.cluster.database;
|
||||
let u = {
|
||||
let d = &pgconf;
|
||||
format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name)
|
||||
};
|
||||
let (pgclient, pgconn) = tokio_postgres::connect(&u, tokio_postgres::NoTls).await.err_conv()?;
|
||||
tokio::spawn(pgconn);
|
||||
let pgclient = Arc::new(pgclient);
|
||||
pgclient
|
||||
};
|
||||
let scyco = if let Some(scyco) = &node_config.node_config.cluster.scylla {
|
||||
scyco
|
||||
} else {
|
||||
return Err(Error::with_public_msg(format!("no scylla configured")));
|
||||
};
|
||||
let scy = create_scy_session(scyco).await?;
|
||||
let do_one_before_range = evq.agg_kind().need_expand();
|
||||
let (series, scalar_type, shape) = dbconn::find_series(evq.channel(), pgclient.clone()).await?;
|
||||
let empty_item = empty_events_dyn_2(&scalar_type, &shape, &AggKind::TimeWeightedScalar);
|
||||
let empty_stream =
|
||||
futures_util::stream::once(futures_util::future::ready(Ok(ChannelEvents::Events(empty_item))));
|
||||
let stream2 = make_scylla_stream(
|
||||
&evq,
|
||||
do_one_before_range,
|
||||
series,
|
||||
scalar_type.clone(),
|
||||
shape.clone(),
|
||||
scy,
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
let mut stream = empty_stream.chain(stream2);
|
||||
let mut coll = None;
|
||||
let mut fut = None;
|
||||
loop {
|
||||
// Alternative way, test what works better:
|
||||
if fut.is_none() {
|
||||
fut = Some(stream.next());
|
||||
}
|
||||
let item = match tokio::time::timeout_at(deadline.into(), fut.as_mut().unwrap()).await {
|
||||
Ok(Some(item)) => {
|
||||
fut.take();
|
||||
item
|
||||
}
|
||||
Ok(None) => {
|
||||
fut.take();
|
||||
break;
|
||||
}
|
||||
Err(_) => {
|
||||
warn!("{self_name} timeout");
|
||||
fut.take();
|
||||
break;
|
||||
}
|
||||
};
|
||||
match item {
|
||||
Ok(k) => match k {
|
||||
ChannelEvents::Events(mut item) => {
|
||||
if coll.is_none() {
|
||||
coll = Some(items_0::collect_s::Collectable::new_collector(item.as_ref()));
|
||||
}
|
||||
let cl = coll.as_mut().unwrap();
|
||||
cl.ingest(item.as_collectable_mut());
|
||||
}
|
||||
ChannelEvents::Status(..) => {}
|
||||
},
|
||||
Err(e) => {
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
match coll {
|
||||
Some(mut coll) => {
|
||||
let res = coll.result()?;
|
||||
let res = res.to_json_result()?;
|
||||
let res = res.to_json_bytes()?;
|
||||
let ret = response(StatusCode::OK).body(Body::from(res))?;
|
||||
Ok(ret)
|
||||
}
|
||||
None => {
|
||||
error!("should never happen with changed logic, remove case");
|
||||
err::todo();
|
||||
let item = empty_events_dyn(&scalar_type, &shape, &AggKind::TimeWeightedScalar);
|
||||
let res = item.to_box_to_json_result();
|
||||
let res = res.to_json_result()?;
|
||||
let res = res.to_json_bytes()?;
|
||||
let ret = response(StatusCode::OK).body(Body::from(res))?;
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BinnedHandlerScylla {}
|
||||
|
||||
impl BinnedHandlerScylla {
|
||||
pub fn handler(req: &Request<Body>) -> Option<Self> {
|
||||
if req.uri().path() == "/api/4/scylla/binned" {
|
||||
Some(Self {})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle(
|
||||
&self,
|
||||
req: Request<Body>,
|
||||
ctx: &ReqCtx,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
if req.method() != Method::GET {
|
||||
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
|
||||
}
|
||||
match self.fetch(req, ctx, node_config).await {
|
||||
Ok(ret) => Ok(ret),
|
||||
Err(e) => {
|
||||
eprintln!("error: {e}");
|
||||
Ok(e.to_public_response())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch(
|
||||
&self,
|
||||
req: Request<Body>,
|
||||
ctx: &ReqCtx,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
info!("BinnedHandlerScylla req: {:?}", req);
|
||||
let accept_def = APP_JSON;
|
||||
let accept = req
|
||||
.headers()
|
||||
.get(http::header::ACCEPT)
|
||||
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
|
||||
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
|
||||
Ok(self.gather(req, ctx, node_config).await?)
|
||||
} else {
|
||||
let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("Unsupported Accept: {:?}", accept))?;
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
async fn gather(
|
||||
&self,
|
||||
req: Request<Body>,
|
||||
_ctx: &ReqCtx,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
let (head, _body) = req.into_parts();
|
||||
warn!("TODO BinnedQuery needs to take AggKind to do x-binngin");
|
||||
let s1 = format!("dummy:{}", head.uri);
|
||||
let url = Url::parse(&s1)?;
|
||||
let evq = BinnedQuery::from_url(&url)?;
|
||||
let do_one_before_range = evq.agg_kind().need_expand();
|
||||
let pgclient = {
|
||||
// TODO use common connection/pool:
|
||||
info!("--------------- open postgres connection");
|
||||
let pgconf = &node_config.node_config.cluster.database;
|
||||
let u = {
|
||||
let d = &pgconf;
|
||||
format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name)
|
||||
};
|
||||
let (pgclient, pgconn) = tokio_postgres::connect(&u, tokio_postgres::NoTls).await.err_conv()?;
|
||||
tokio::spawn(pgconn);
|
||||
let pgclient = Arc::new(pgclient);
|
||||
pgclient
|
||||
};
|
||||
if let Some(scyco) = &node_config.node_config.cluster.scylla {
|
||||
let scy = create_scy_session(scyco).await?;
|
||||
let covering = BinnedRange::covering_range(evq.range().clone(), evq.bin_count())?;
|
||||
let range = covering.full_range();
|
||||
let mut query2 = PlainEventsQuery::new(
|
||||
evq.channel().clone(),
|
||||
range.clone(),
|
||||
evq.agg_kind().clone(),
|
||||
Duration::from_millis(6000),
|
||||
None,
|
||||
false,
|
||||
);
|
||||
query2.set_timeout(evq.timeout());
|
||||
let query2 = query2;
|
||||
let (series, scalar_type, shape) = dbconn::find_series(evq.channel(), pgclient.clone()).await?;
|
||||
let stream = make_scylla_stream(
|
||||
&query2,
|
||||
do_one_before_range,
|
||||
series,
|
||||
scalar_type.clone(),
|
||||
shape.clone(),
|
||||
scy.clone(),
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
let query3 = ChannelStateEventsQuery::new(evq.channel().clone(), range.clone());
|
||||
let state_stream = channel_state_events(&query3, scy.clone())
|
||||
.await?
|
||||
.map(|x| {
|
||||
//eprintln!("state_stream {x:?}");
|
||||
x
|
||||
})
|
||||
.map_err(|e| items_2::Error::from(format!("{e}")));
|
||||
// TODO let the stream itself use the items_2 error, do not convert here.
|
||||
let data_stream = stream
|
||||
.map(|x| {
|
||||
//eprintln!("data_stream {x:?}");
|
||||
x
|
||||
})
|
||||
.map_err(|e| items_2::Error::from(format!("{e}")));
|
||||
error!("TODO BinnedHandlerScylla::gather");
|
||||
err::todo();
|
||||
type Items = Pin<Box<dyn Stream<Item = Result<ChannelEvents, items_2::Error>> + Send>>;
|
||||
let _data_stream = Box::pin(data_stream) as Items;
|
||||
let _sate_stream = Box::pin(state_stream) as Items;
|
||||
let merged_stream = ChannelEventsMerger::new(err::todoval());
|
||||
let _ = merged_stream;
|
||||
//let merged_stream = ChannelEventsMerger::new(vec![data_stream, state_stream]);
|
||||
let merged_stream = Box::pin(merged_stream) as Pin<Box<dyn Stream<Item = _> + Send>>;
|
||||
let binned_collected = binned_collected(
|
||||
scalar_type.clone(),
|
||||
shape.clone(),
|
||||
evq.agg_kind().clone(),
|
||||
covering.edges(),
|
||||
evq.timeout(),
|
||||
merged_stream,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e}")))?;
|
||||
let res = binned_collected.to_json_result()?;
|
||||
let res = res.to_json_bytes()?;
|
||||
let ret = response(StatusCode::OK).body(Body::from(res))?;
|
||||
{
|
||||
let _ret = response(StatusCode::OK).body(BodyStream::wrapped(
|
||||
futures_util::stream::iter([Ok(Vec::new())]),
|
||||
format!("gather"),
|
||||
))?;
|
||||
}
|
||||
Ok(ret)
|
||||
} else {
|
||||
return Err(Error::with_public_msg(format!("no scylla configured")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,9 +3,9 @@ use crate::response;
|
||||
use futures_util::{select, FutureExt};
|
||||
use http::{Method, StatusCode};
|
||||
use hyper::{Body, Client, Request, Response};
|
||||
use hyper_tls::HttpsConnector;
|
||||
use netpod::log::*;
|
||||
use netpod::{Node, NodeConfigCached, APP_JSON};
|
||||
use netpod::APP_JSON;
|
||||
use netpod::{Node, NodeConfigCached};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value as JsonValue;
|
||||
use std::future::Future;
|
||||
@@ -175,6 +175,7 @@ pub async fn gather_get_json_generic<SM, NT, FT>(
|
||||
tags: Vec<String>,
|
||||
nt: NT,
|
||||
ft: FT,
|
||||
// TODO use deadline instead
|
||||
timeout: Duration,
|
||||
) -> Result<Response<Body>, Error>
|
||||
where
|
||||
@@ -198,7 +199,6 @@ where
|
||||
.zip(tags.into_iter())
|
||||
.map(move |((url, body), tag)| {
|
||||
let url_str = url.as_str();
|
||||
let is_tls = if url_str.starts_with("https://") { true } else { false };
|
||||
let req = if body.is_some() {
|
||||
Request::builder().method(Method::POST).uri(url_str)
|
||||
} else {
|
||||
@@ -210,7 +210,6 @@ where
|
||||
} else {
|
||||
req
|
||||
};
|
||||
//let req = req.header("x-log-from-node-name", format!("{}", node_config.node_config.name));
|
||||
let body = match body {
|
||||
None => Body::empty(),
|
||||
Some(body) => body,
|
||||
@@ -222,15 +221,8 @@ where
|
||||
Err(Error::with_msg("timeout"))
|
||||
}
|
||||
res = {
|
||||
if is_tls {
|
||||
let https = HttpsConnector::new();
|
||||
let client = Client::builder().build::<_, hyper::Body>(https);
|
||||
client.request(req?).fuse()
|
||||
}
|
||||
else {
|
||||
let client = Client::new();
|
||||
client.request(req?).fuse()
|
||||
}
|
||||
let client = Client::new();
|
||||
client.request(req?).fuse()
|
||||
} => {
|
||||
let ret = nt(tag, res?).await?;
|
||||
Ok(ret)
|
||||
|
||||
@@ -312,13 +312,9 @@ async fn http_service_inner(
|
||||
h.handle(req, &node_config).await
|
||||
} else if let Some(h) = events::EventsHandler::handler(&req) {
|
||||
h.handle(req, &node_config).await
|
||||
} else if let Some(h) = events::EventsHandlerScylla::handler(&req) {
|
||||
h.handle(req, ctx, &node_config).await
|
||||
} else if let Some(h) = events::BinnedHandlerScylla::handler(&req) {
|
||||
h.handle(req, ctx, &node_config).await
|
||||
} else if let Some(h) = channel_status::ConnectionStatusEvents::handler(&req) {
|
||||
h.handle(req, ctx, &node_config).await
|
||||
} else if let Some(h) = channel_status::ChannelConnectionStatusEvents::handler(&req) {
|
||||
} else if let Some(h) = channel_status::ChannelStatusEvents::handler(&req) {
|
||||
h.handle(req, ctx, &node_config).await
|
||||
} else if let Some(h) = api4::binned::BinnedHandler::handler(&req) {
|
||||
h.handle(req, &node_config).await
|
||||
@@ -497,7 +493,7 @@ async fn node_status(
|
||||
let (_head, _body) = req.into_parts();
|
||||
let archiver_appliance_status = match node_config.node.archiver_appliance.as_ref() {
|
||||
Some(k) => {
|
||||
let mut st = vec![];
|
||||
let mut st = Vec::new();
|
||||
for p in &k.data_base_paths {
|
||||
let _m = match tokio::fs::metadata(p).await {
|
||||
Ok(m) => m,
|
||||
@@ -718,7 +714,7 @@ impl StatusBoardEntry {
|
||||
ts_updated: SystemTime::now(),
|
||||
is_error: false,
|
||||
is_ok: false,
|
||||
errors: vec![],
|
||||
errors: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -808,7 +804,7 @@ impl StatusBoard {
|
||||
match self.entries.get(status_id) {
|
||||
Some(e) => {
|
||||
if e.is_ok {
|
||||
let js = StatJs { errors: vec![] };
|
||||
let js = StatJs { errors: Vec::new() };
|
||||
return serde_json::to_string(&js).unwrap();
|
||||
} else if e.is_error {
|
||||
let errors = e.errors.iter().map(|e| (&e.0).into()).collect();
|
||||
@@ -816,7 +812,7 @@ impl StatusBoard {
|
||||
return serde_json::to_string(&js).unwrap();
|
||||
} else {
|
||||
warn!("requestStatus for unfinished {status_id}");
|
||||
let js = StatJs { errors: vec![] };
|
||||
let js = StatJs { errors: Vec::new() };
|
||||
return serde_json::to_string(&js).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,7 +9,6 @@ use futures_util::{pin_mut, Stream};
|
||||
use http::{Method, StatusCode};
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{Body, Request, Response, Server};
|
||||
use hyper_tls::HttpsConnector;
|
||||
use itertools::Itertools;
|
||||
use netpod::log::*;
|
||||
use netpod::query::{BinnedQuery, PlainEventsQuery};
|
||||
@@ -458,14 +457,8 @@ pub async fn proxy_api1_map_pulse(
|
||||
let sh = &g.url;
|
||||
let url = format!("{}/api/1/map/pulse/{}", sh, pulseid);
|
||||
let req = Request::builder().method(Method::GET).uri(url).body(Body::empty())?;
|
||||
let res = if sh.starts_with("https") {
|
||||
let https = HttpsConnector::new();
|
||||
let c = hyper::Client::builder().build(https);
|
||||
c.request(req).await?
|
||||
} else {
|
||||
let c = hyper::Client::new();
|
||||
c.request(req).await?
|
||||
};
|
||||
let c = hyper::Client::new();
|
||||
let res = c.request(req).await?;
|
||||
let ret = response(StatusCode::OK).body(res.into_body())?;
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
@@ -557,7 +557,7 @@ impl MapPulseScyllaHandler {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle(&self, req: Request<Body>, _node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
if req.method() != Method::GET {
|
||||
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
|
||||
}
|
||||
@@ -565,13 +565,12 @@ impl MapPulseScyllaHandler {
|
||||
let url = url::Url::parse(&urls)?;
|
||||
let query = MapPulseQuery::from_url(&url)?;
|
||||
let pulse = query.pulse;
|
||||
let scy = scylla::SessionBuilder::new()
|
||||
.known_node("sf-daqbuf-34:19042")
|
||||
.default_consistency(scylla::batch::Consistency::One)
|
||||
.use_keyspace("ks1", false)
|
||||
.build()
|
||||
.await
|
||||
.err_conv()?;
|
||||
let scyconf = if let Some(x) = node_config.node_config.cluster.scylla.as_ref() {
|
||||
x
|
||||
} else {
|
||||
return Err(Error::with_public_msg_no_trace("no scylla configured"));
|
||||
};
|
||||
let scy = scyllaconn::create_scy_session(&scyconf).await?;
|
||||
let pulse_a = (pulse >> 14) as i64;
|
||||
let pulse_b = (pulse & 0x3fff) as i32;
|
||||
let res = scy
|
||||
@@ -582,8 +581,9 @@ impl MapPulseScyllaHandler {
|
||||
.await
|
||||
.err_conv()?;
|
||||
let rows = res.rows().err_conv()?;
|
||||
let mut tss = vec![];
|
||||
let mut channels = vec![];
|
||||
let ch = "pulsemaptable";
|
||||
let mut tss = Vec::new();
|
||||
let mut channels = Vec::new();
|
||||
use scylla::frame::response::result::CqlValue;
|
||||
let ts_a_def = CqlValue::BigInt(0);
|
||||
let ts_b_def = CqlValue::Int(0);
|
||||
@@ -591,7 +591,7 @@ impl MapPulseScyllaHandler {
|
||||
let ts_a = row.columns[0].as_ref().unwrap_or(&ts_a_def).as_bigint().unwrap_or(0) as u64;
|
||||
let ts_b = row.columns[1].as_ref().unwrap_or(&ts_b_def).as_int().unwrap_or(0) as u32 as u64;
|
||||
tss.push(ts_a * netpod::timeunits::SEC + ts_b);
|
||||
channels.push("scylla".into());
|
||||
channels.push(ch.into());
|
||||
}
|
||||
let ret = LocalMap { pulse, tss, channels };
|
||||
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?)
|
||||
|
||||
Reference in New Issue
Block a user