use crate::gather::gather_get_json;
use bytes::Bytes;
use disk::binned::prebinned::pre_binned_bytes_for_http;
use disk::binned::query::{BinnedQuery, PreBinnedQuery};
use disk::events::{PlainEventsBinaryQuery, PlainEventsJsonQuery};
use disk::raw::conn::events_service;
use err::Error;
use future::Future;
use futures_core::Stream;
use futures_util::{FutureExt, StreamExt};
use http::{HeaderMap, Method, StatusCode};
use hyper::service::{make_service_fn, service_fn};
use hyper::{server::Server, Body, Request, Response};
use net::SocketAddr;
use netpod::log::*;
use netpod::{get_url_query_pairs, AggKind, Channel, FromUrl, NodeConfigCached, APP_JSON, APP_JSON_LINES, APP_OCTET};
use panic::{AssertUnwindSafe, UnwindSafe};
use pin::Pin;
use serde::{Deserialize, Serialize};
use std::{future, net, panic, pin, task};
use task::{Context, Poll};
use tracing::field::Empty;
use tracing::Instrument;
use url::Url;
pub mod api1;
pub mod gather;
pub mod proxy;
pub mod search;
fn proxy_mark() -> &'static str {
"7c5e408a"
}
pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> {
let rawjh = taskrun::spawn(events_service(node_config.clone()));
use std::str::FromStr;
let addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen, node_config.node.port))?;
let make_service = make_service_fn({
move |_conn| {
let node_config = node_config.clone();
async move {
Ok::<_, Error>(service_fn({
move |req| {
let f = http_service(req, node_config.clone());
Cont { f: Box::pin(f) }
}
}))
}
}
});
Server::bind(&addr).serve(make_service).await?;
rawjh.await??;
Ok(())
}
async fn http_service(req: Request
, node_config: NodeConfigCached) -> Result, Error> {
match http_service_try(req, &node_config).await {
Ok(k) => Ok(k),
Err(e) => {
error!("daqbuffer node http_service sees error: {:?}", e);
Err(e)
}
}
}
struct Cont {
f: Pin>,
}
impl Future for Cont
where
F: Future>,
{
type Output = ::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {
let h = std::panic::catch_unwind(AssertUnwindSafe(|| self.f.poll_unpin(cx)));
match h {
Ok(k) => k,
Err(e) => {
error!("Cont catch_unwind {:?}", e);
match e.downcast_ref::() {
Some(e) => {
error!("Cont catch_unwind is Error: {:?}", e);
}
None => {}
}
Poll::Ready(Err(Error::from(format!("{:?}", e))))
}
}
}
}
impl UnwindSafe for Cont {}
macro_rules! static_http {
($path:expr, $tgt:expr, $tgtex:expr, $ctype:expr) => {
if $path == concat!("/api/4/documentation/", $tgt) {
let c = include_bytes!(concat!("../static/documentation/", $tgtex));
let ret = response(StatusCode::OK)
.header("content-type", $ctype)
.body(Body::from(&c[..]))?;
return Ok(ret);
}
};
($path:expr, $tgt:expr, $ctype:expr) => {
if $path == concat!("/api/4/documentation/", $tgt) {
let c = include_bytes!(concat!("../static/documentation/", $tgt));
let ret = response(StatusCode::OK)
.header("content-type", $ctype)
.body(Body::from(&c[..]))?;
return Ok(ret);
}
};
}
macro_rules! static_http_api1 {
($path:expr, $tgt:expr, $tgtex:expr, $ctype:expr) => {
if $path == concat!("/api/1/documentation/", $tgt) {
let c = include_bytes!(concat!("../static/documentation/", $tgtex));
let ret = response(StatusCode::OK)
.header("content-type", $ctype)
.body(Body::from(&c[..]))?;
return Ok(ret);
}
};
($path:expr, $tgt:expr, $ctype:expr) => {
if $path == concat!("/api/1/documentation/", $tgt) {
let c = include_bytes!(concat!("../static/documentation/", $tgt));
let ret = response(StatusCode::OK)
.header("content-type", $ctype)
.body(Body::from(&c[..]))?;
return Ok(ret);
}
};
}
async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> Result, Error> {
let uri = req.uri().clone();
let path = uri.path();
if path == "/api/4/node_status" {
if req.method() == Method::GET {
Ok(node_status(req, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/search/channel" {
if req.method() == Method::GET {
Ok(search::channel_search(req, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/events" {
if req.method() == Method::GET {
Ok(plain_events(req, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/binned" {
if req.method() == Method::GET {
Ok(binned(req, node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/prebinned" {
if req.method() == Method::GET {
Ok(prebinned(req, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/table_sizes" {
if req.method() == Method::GET {
Ok(table_sizes(req, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/random_channel" {
if req.method() == Method::GET {
Ok(random_channel(req, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path.starts_with("/api/4/gather/") {
if req.method() == Method::GET {
Ok(gather_get_json(req, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/clear_cache" {
if req.method() == Method::GET {
Ok(clear_cache_all(req, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/update_db_with_channel_names" {
if req.method() == Method::GET {
Ok(update_db_with_channel_names(req, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/update_db_with_all_channel_configs" {
if req.method() == Method::GET {
Ok(update_db_with_all_channel_configs(req, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/update_search_cache" {
if req.method() == Method::GET {
Ok(update_search_cache(req, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/ca_connect_1" {
if req.method() == Method::GET {
Ok(ca_connect_1(req, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/channel/config" {
if req.method() == Method::GET {
Ok(channel_config(req, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path.starts_with("/api/1/documentation/") {
if req.method() == Method::GET {
static_http_api1!(path, "", "api1.html", "text/html");
static_http_api1!(path, "style.css", "text/css");
static_http_api1!(path, "script.js", "text/javascript");
Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path.starts_with("/api/4/documentation/") {
if req.method() == Method::GET {
static_http!(path, "", "api4.html", "text/html");
static_http!(path, "style.css", "text/css");
static_http!(path, "script.js", "text/javascript");
static_http!(path, "status-main.html", "text/html");
Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else {
Ok(response(StatusCode::NOT_FOUND).body(Body::from(format!(
"Sorry, not found: {:?} {:?} {:?}",
req.method(),
req.uri().path(),
req.uri().query(),
)))?)
}
}
fn response(status: T) -> http::response::Builder
where
http::StatusCode: std::convert::TryFrom,
>::Error: Into,
{
Response::builder()
.status(status)
.header("Access-Control-Allow-Origin", "*")
.header("Access-Control-Allow-Headers", "*")
.header("x-proxy-log-mark", proxy_mark())
}
struct BodyStreamWrap(netpod::BodyStream);
impl hyper::body::HttpBody for BodyStreamWrap {
type Data = bytes::Bytes;
type Error = Error;
fn poll_data(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll>> {
self.0.inner.poll_next_unpin(cx)
}
fn poll_trailers(self: Pin<&mut Self>, _cx: &mut Context) -> Poll, Self::Error>> {
Poll::Ready(Ok(None))
}
}
struct BodyStream {
inp: S,
desc: String,
}
impl BodyStream
where
S: Stream- > + Unpin + Send + 'static,
I: Into
+ Sized + 'static,
{
pub fn new(inp: S, desc: String) -> Self {
Self { inp, desc }
}
pub fn wrapped(inp: S, desc: String) -> Body {
Body::wrap_stream(Self::new(inp, desc))
}
}
impl Stream for BodyStream
where
S: Stream- > + Unpin,
I: Into
+ Sized,
{
type Item = Result;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> {
let span1 = span!(Level::INFO, "httpret::BodyStream", desc = Empty);
span1.record("desc", &self.desc.as_str());
span1.in_scope(|| {
use Poll::*;
let t = std::panic::catch_unwind(AssertUnwindSafe(|| self.inp.poll_next_unpin(cx)));
match t {
Ok(r) => match r {
Ready(Some(Ok(k))) => Ready(Some(Ok(k))),
Ready(Some(Err(e))) => {
error!("body stream error: {:?}", e);
Ready(Some(Err(e.into())))
}
Ready(None) => Ready(None),
Pending => Pending,
},
Err(e) => {
error!("panic caught in httpret::BodyStream: {:?}", e);
let e = Error::with_msg(format!("panic caught in httpret::BodyStream: {:?}", e));
Ready(Some(Err(e)))
}
}
})
}
}
async fn binned(req: Request, node_config: &NodeConfigCached) -> Result, Error> {
let (head, _body) = req.into_parts();
let url = Url::parse(&format!("dummy:{}", head.uri))?;
let query = BinnedQuery::from_url(&url)?;
match head.headers.get(http::header::ACCEPT) {
Some(v) if v == APP_OCTET => binned_binary(query, node_config).await,
Some(v) if v == APP_JSON => binned_json(query, node_config).await,
_ => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?),
}
}
async fn binned_binary(query: BinnedQuery, node_config: &NodeConfigCached) -> Result, Error> {
let ret = match disk::binned::binned_bytes_for_http(&query, node_config).await {
Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("binned_binary")))?,
Err(e) => {
if query.report_error() {
response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))?
} else {
error!("fn binned_binary: {:?}", e);
response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?
}
}
};
Ok(ret)
}
async fn binned_json(query: BinnedQuery, node_config: &NodeConfigCached) -> Result, Error> {
let ret = match disk::binned::binned_json(&query, node_config).await {
Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("binned_json")))?,
Err(e) => {
if query.report_error() {
response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))?
} else {
error!("fn binned_json: {:?}", e);
response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?
}
}
};
Ok(ret)
}
async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result, Error> {
let (head, _body) = req.into_parts();
let query = PreBinnedQuery::from_request(&head)?;
let desc = format!("pre-b-{}", query.patch().bin_t_len() / 1000000000);
let span1 = span!(Level::INFO, "httpret::prebinned_DISABLED", desc = &desc.as_str());
//span1.in_scope(|| {});
let fut = pre_binned_bytes_for_http(node_config, &query).instrument(span1);
let ret = match fut.await {
Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(
s,
format!(
"pre-b-{}-p-{}",
query.patch().bin_t_len() / 1000000000,
query.patch().patch_beg() / 1000000000,
),
))?,
Err(e) => {
if query.report_error() {
response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))?
} else {
error!("fn prebinned: {:?}", e);
response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?
}
}
};
Ok(ret)
}
async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Result, Error> {
info!("httpret plain_events headers: {:?}", req.headers());
let accept_def = "";
let accept = req
.headers()
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept == APP_JSON {
Ok(plain_events_json(req, node_config).await?)
} else if accept == APP_OCTET {
Ok(plain_events_binary(req, node_config).await?)
} else {
Err(Error::with_msg(format!("unexpected Accept: {:?}", accept)))
}
}
async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) -> Result, Error> {
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let query = PlainEventsBinaryQuery::from_url(&url)?;
let op = disk::channelexec::PlainEvents::new(query.channel().clone(), query.range().clone(), node_config.clone());
let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?;
let s = s.map(|item| item.make_frame());
let ret = response(StatusCode::OK).body(BodyStream::wrapped(s, format!("plain_events")))?;
Ok(ret)
}
async fn plain_events_json(req: Request, node_config: &NodeConfigCached) -> Result, Error> {
let (head, _body) = req.into_parts();
let query = PlainEventsJsonQuery::from_request_head(&head)?;
let op = disk::channelexec::PlainEventsJson::new(
query.channel().clone(),
query.range().clone(),
query.timeout(),
node_config.clone(),
);
let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?;
let ret = response(StatusCode::OK).body(BodyStream::wrapped(s, format!("plain_events")))?;
Ok(ret)
}
#[derive(Debug, Serialize, Deserialize)]
pub struct NodeStatus {
database_size: u64,
}
async fn node_status(req: Request, node_config: &NodeConfigCached) -> Result, Error> {
let (_head, _body) = req.into_parts();
let ret = NodeStatus {
database_size: dbconn::database_size(node_config).await?,
};
let ret = serde_json::to_vec(&ret)?;
let ret = response(StatusCode::OK).body(Body::from(ret))?;
Ok(ret)
}
async fn table_sizes(req: Request, node_config: &NodeConfigCached) -> Result, Error> {
let (_head, _body) = req.into_parts();
let sizes = dbconn::table_sizes(node_config).await?;
let mut ret = String::new();
for size in sizes.sizes {
use std::fmt::Write;
write!(ret, "{:60} {:20}\n", size.0, size.1)?;
}
let ret = response(StatusCode::OK).body(Body::from(ret))?;
Ok(ret)
}
pub async fn random_channel(req: Request, node_config: &NodeConfigCached) -> Result, Error> {
let (_head, _body) = req.into_parts();
let ret = dbconn::random_channel(node_config).await?;
let ret = response(StatusCode::OK).body(Body::from(ret))?;
Ok(ret)
}
pub async fn clear_cache_all(req: Request, node_config: &NodeConfigCached) -> Result, Error> {
let (head, _body) = req.into_parts();
let dry = match head.uri.query() {
Some(q) => q.contains("dry"),
None => false,
};
let res = disk::cache::clear_cache_all(node_config, dry).await?;
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(Body::from(serde_json::to_string(&res)?))?;
Ok(ret)
}
pub async fn update_db_with_channel_names(
req: Request,
node_config: &NodeConfigCached,
) -> Result, Error> {
let (head, _body) = req.into_parts();
let _dry = match head.uri.query() {
Some(q) => q.contains("dry"),
None => false,
};
let res =
dbconn::scan::update_db_with_channel_names(node_config.clone(), &node_config.node_config.cluster.database)
.await?;
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON_LINES)
.body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) {
Ok(mut item) => {
item.push('\n');
Ok(item)
}
Err(e) => Err(e),
})))?;
Ok(ret)
}
pub async fn update_db_with_channel_names_3(
req: Request,
node_config: &NodeConfigCached,
) -> Result, Error> {
let (head, _body) = req.into_parts();
let _dry = match head.uri.query() {
Some(q) => q.contains("dry"),
None => false,
};
let res = dbconn::scan::update_db_with_channel_names_3(node_config);
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON_LINES)
.body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) {
Ok(mut item) => {
item.push('\n');
Ok(item)
}
Err(e) => Err(e),
})))?;
Ok(ret)
}
pub async fn update_db_with_all_channel_configs(
req: Request,
node_config: &NodeConfigCached,
) -> Result, Error> {
let (head, _body) = req.into_parts();
let _dry = match head.uri.query() {
Some(q) => q.contains("dry"),
None => false,
};
let res = dbconn::scan::update_db_with_all_channel_configs(node_config.clone()).await?;
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON_LINES)
.body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) {
Ok(mut item) => {
item.push('\n');
Ok(item)
}
Err(e) => Err(e),
})))?;
Ok(ret)
}
pub async fn update_search_cache(req: Request, node_config: &NodeConfigCached) -> Result, Error> {
let (head, _body) = req.into_parts();
let _dry = match head.uri.query() {
Some(q) => q.contains("dry"),
None => false,
};
let res = dbconn::scan::update_search_cache(node_config).await?;
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(Body::from(serde_json::to_string(&res)?))?;
Ok(ret)
}
pub async fn channel_config(req: Request, node_config: &NodeConfigCached) -> Result, Error> {
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let pairs = get_url_query_pairs(&url);
let _dry = pairs.contains_key("dry");
let channel = Channel {
backend: node_config.node.backend.clone(),
name: pairs.get("channelName").unwrap().into(),
};
let res = parse::channelconfig::read_local_config(&channel, &node_config.node).await?;
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(Body::from(serde_json::to_string(&res)?))?;
Ok(ret)
}
pub async fn ca_connect_1(req: Request, node_config: &NodeConfigCached) -> Result, Error> {
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let pairs = get_url_query_pairs(&url);
let addr = pairs.get("addr").unwrap().into();
let res = netfetch::ca_connect_1(addr, node_config).await?;
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON_LINES)
.body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) {
Ok(mut item) => {
item.push('\n');
Ok(item)
}
Err(e) => Err(e),
})))?;
Ok(ret)
}