From f82989f5c990cb5918e1ff7708c92354646473bb Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 28 Feb 2022 13:55:34 +0100 Subject: [PATCH] Factor out channel config handler --- commonio/Cargo.toml | 1 + commonio/src/commonio.rs | 47 +++++++++++++++++--- disk/src/channelexec.rs | 9 ++-- err/Cargo.toml | 12 +++--- err/src/lib.rs | 45 ++++++++++++++++++- httpclient/src/lib.rs | 32 ++++++++++---- httpret/Cargo.toml | 1 + httpret/src/channelconfig.rs | 62 ++++++++++++++++++++++++++ httpret/src/err.rs | 4 +- httpret/src/httpret.rs | 84 +++++++++++++----------------------- httpret/src/pulsemap.rs | 3 +- items_proc/Cargo.toml | 3 ++ items_proc/src/items_proc.rs | 16 +++++++ parse/src/channelconfig.rs | 12 +++++- 14 files changed, 248 insertions(+), 83 deletions(-) create mode 100644 httpret/src/channelconfig.rs diff --git a/commonio/Cargo.toml b/commonio/Cargo.toml index fb22b91..8d573b8 100644 --- a/commonio/Cargo.toml +++ b/commonio/Cargo.toml @@ -25,3 +25,4 @@ err = { path = "../err" } taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } items = { path = "../items" } +items_proc = { path = "../items_proc" } diff --git a/commonio/src/commonio.rs b/commonio/src/commonio.rs index d5cef72..e644ace 100644 --- a/commonio/src/commonio.rs +++ b/commonio/src/commonio.rs @@ -7,8 +7,9 @@ use items::eventsitem::EventsItem; use items::{Sitemty, StatsItem, StreamItem}; use netpod::log::*; use netpod::{DiskStats, OpenStats, ReadExactStats, ReadStats, SeekStats}; +use serde::{Deserialize, Serialize}; use std::fmt; -use std::io::{self, SeekFrom}; +use std::io::{self, ErrorKind, SeekFrom}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Instant; @@ -18,11 +19,47 @@ use tokio::io::{AsyncReadExt, AsyncSeekExt}; const LOG_IO: bool = true; const STATS_IO: bool = true; -pub async fn tokio_read(path: impl AsRef) -> Result, Error> { +#[derive(Debug, Serialize, Deserialize)] +pub struct CIOError { + kind: ErrorKindSimple, + path: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum ErrorKindSimple { + NotFound, + PermissionDenied, + AlreadyExists, + Other(String), +} + +impl From for ErrorKindSimple { + fn from(k: ErrorKind) -> Self { + match k { + ErrorKind::NotFound => ErrorKindSimple::NotFound, + ErrorKind::PermissionDenied => ErrorKindSimple::PermissionDenied, + ErrorKind::AlreadyExists => ErrorKindSimple::AlreadyExists, + a => ErrorKindSimple::Other(format!("{a:?}")), + } + } +} + +pub async fn tokio_read(path: impl AsRef) -> Result, CIOError> { let path = path.as_ref(); - tokio::fs::read(path) - .await - .map_err(|e| Error::with_msg_no_trace(format!("Can not open {path:?} {e:?}"))) + tokio::fs::read(path).await.map_err(|e| CIOError { + kind: e.kind().into(), + path: Some(path.into()), + }) +} + +pub async fn tokio_rand() -> Result { + type T = u64; + let mut f = tokio::fs::File::open("/dev/urandom").await?; + let mut buf = [0u8; std::mem::size_of::()]; + f.read_exact(&mut buf[..]).await?; + let y = buf.try_into().map_err(|e| Error::with_msg(format!("{e:?}")))?; + let x = u64::from_le_bytes(y); + Ok(x) } pub struct StatsChannel { diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index 179780d..2646085 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -208,9 +208,12 @@ where range: range.clone(), expand: agg_kind.need_expand(), }; - let conf = httpclient::get_channel_config(&q, node_config) - .await - .map_err(|e| e.add_public_msg(format!("Can not find channel config for {}", q.channel.name())))?; + let conf = httpclient::get_channel_config(&q, node_config).await.map_err(|e| { + e.add_public_msg(format!( + "Can not find channel config for channel: {:?}", + q.channel.name() + )) + })?; let ret = channel_exec_config( f, conf.scalar_type.clone(), diff --git a/err/Cargo.toml b/err/Cargo.toml index 023c083..9744a99 100644 --- a/err/Cargo.toml +++ b/err/Cargo.toml @@ -1,16 +1,16 @@ [package] name = "err" -version = "0.0.2" +version = "0.0.3" authors = ["Dominik Werder "] edition = "2021" [dependencies] -backtrace = "0.3.56" +backtrace = "0.3" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -serde_cbor = "0.11.1" +serde_cbor = "0.11" async-channel = "1.6" -chrono = { version = "0.4.19", features = ["serde"] } +chrono = { version = "0.4", features = ["serde"] } url = "2.2" -regex = "1.5.4" -bincode = "1.3.3" +regex = "1.5" +bincode = "1.3" diff --git a/err/src/lib.rs b/err/src/lib.rs index 9580723..fa6a6ff 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -10,7 +10,7 @@ use std::num::{ParseFloatError, ParseIntError}; use std::string::FromUtf8Error; use std::sync::PoisonError; -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum Reason { InternalError, BadRequest, @@ -77,6 +77,11 @@ impl Error { self } + pub fn mark_io_error(mut self) -> Self { + self.reason = Some(Reason::IoError); + self + } + pub fn add_public_msg(mut self, msg: impl Into) -> Self { if self.public_msg.is_none() { self.public_msg = Some(vec![]); @@ -96,6 +101,16 @@ impl Error { pub fn reason(&self) -> Option { self.reason.clone() } + + pub fn to_public_error(&self) -> PublicError { + PublicError { + reason: self.reason(), + msg: self + .public_msg() + .map(|k| k.join("\n")) + .unwrap_or("No error message".into()), + } + } } fn fmt_backtrace(trace: &backtrace::Backtrace) -> String { @@ -195,6 +210,18 @@ where } } +impl From for Error { + fn from(k: PublicError) -> Self { + Self { + msg: String::new(), + trace: None, + trace_str: None, + public_msg: Some(vec![k.msg().into()]), + reason: k.reason(), + } + } +} + impl From for Error { fn from(k: String) -> Self { Self::with_msg(k) @@ -323,6 +350,22 @@ impl From for Error { } } +#[derive(Debug, Serialize, Deserialize)] +pub struct PublicError { + reason: Option, + msg: String, +} + +impl PublicError { + pub fn reason(&self) -> Option { + self.reason.clone() + } + + pub fn msg(&self) -> &str { + &self.msg + } +} + pub fn todo() { todo!("TODO"); } diff --git a/httpclient/src/lib.rs b/httpclient/src/lib.rs index e365180..758ce53 100644 --- a/httpclient/src/lib.rs +++ b/httpclient/src/lib.rs @@ -1,4 +1,4 @@ -use err::Error; +use err::{Error, PublicError}; use hyper::{Body, Method}; use netpod::{AppendToUrl, ChannelConfigQuery, ChannelConfigResponse, NodeConfigCached}; use url::Url; @@ -18,13 +18,27 @@ pub async fn get_channel_config( .body(Body::empty()) .map_err(Error::from_string)?; let client = hyper::Client::new(); - let res = client.request(req).await.map_err(Error::from_string)?; - if !res.status().is_success() { - return Err(Error::with_msg("http client error")); - } - let buf = hyper::body::to_bytes(res.into_body()) + let res = client + .request(req) .await - .map_err(Error::from_string)?; - let ret: ChannelConfigResponse = serde_json::from_slice(&buf)?; - Ok(ret) + .map_err(|e| Error::with_msg(format!("get_channel_config request error: {e:?}")))?; + if res.status().is_success() { + let buf = hyper::body::to_bytes(res.into_body()) + .await + .map_err(|e| Error::with_msg(format!("can not read response: {e:?}")))?; + let ret: ChannelConfigResponse = serde_json::from_slice(&buf) + .map_err(|e| Error::with_msg(format!("can not parse the channel config response json: {e:?}")))?; + Ok(ret) + } else { + let buf = hyper::body::to_bytes(res.into_body()) + .await + .map_err(|e| Error::with_msg(format!("can not read response: {e:?}")))?; + match serde_json::from_slice::(&buf) { + Ok(e) => Err(e.into()), + Err(_) => Err(Error::with_msg(format!( + "can not parse the http error body: {:?}", + String::from_utf8_lossy(&buf) + ))), + } + } } diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index 0136a5c..2a7f015 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -31,4 +31,5 @@ parse = { path = "../parse" } netfetch = { path = "../netfetch" } archapp_wrap = { path = "../archapp_wrap" } nodenet = { path = "../nodenet" } +commonio = { path = "../commonio" } taskrun = { path = "../taskrun" } diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs new file mode 100644 index 0000000..65aeeeb --- /dev/null +++ b/httpret/src/channelconfig.rs @@ -0,0 +1,62 @@ +use crate::err::Error; +use crate::{response, ToPublicResponse}; +use http::{Method, Request, Response, StatusCode}; +use hyper::Body; +use netpod::log::*; +use netpod::NodeConfigCached; +use netpod::{ChannelConfigQuery, FromUrl}; +use netpod::{ACCEPT_ALL, APP_JSON}; +use url::Url; + +pub struct ChannelConfigHandler {} + +impl ChannelConfigHandler { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/4/channel/config" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() == Method::GET { + let accept_def = APP_JSON; + let accept = req + .headers() + .get(http::header::ACCEPT) + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + if accept == APP_JSON || accept == ACCEPT_ALL { + match channel_config(req, &node_config).await { + Ok(k) => Ok(k), + Err(e) => { + warn!("ChannelConfigHandler::handle: got error from channel_config: {e:?}"); + Ok(e.to_public_response()) + } + } + } else { + Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) + } + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } +} + +pub async fn channel_config(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + let url = Url::parse(&format!("dummy:{}", req.uri()))?; + //let pairs = get_url_query_pairs(&url); + let q = ChannelConfigQuery::from_url(&url)?; + let conf = if let Some(conf) = &node_config.node.channel_archiver { + archapp_wrap::archapp::archeng::channel_config_from_db(&q, conf, &node_config.node_config.cluster.database) + .await? + } else if let Some(conf) = &node_config.node.archiver_appliance { + archapp_wrap::channel_config(&q, conf).await? + } else { + parse::channelconfig::channel_config(&q, &node_config.node).await? + }; + let ret = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, APP_JSON) + .body(Body::from(serde_json::to_string(&conf)?))?; + Ok(ret) +} diff --git a/httpret/src/err.rs b/httpret/src/err.rs index b8b726e..5bf532e 100644 --- a/httpret/src/err.rs +++ b/httpret/src/err.rs @@ -1,6 +1,8 @@ +use serde::{Deserialize, Serialize}; use std::fmt; -pub struct Error(::err::Error); +#[derive(Serialize, Deserialize)] +pub struct Error(pub ::err::Error); impl Error { pub fn with_msg>(s: S) -> Self { diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index fd5138b..0b54a7f 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -1,5 +1,6 @@ pub mod api1; pub mod channelarchiver; +pub mod channelconfig; pub mod err; pub mod events; pub mod evinfo; @@ -21,14 +22,12 @@ use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; use hyper::{server::Server, Body, Request, Response}; use net::SocketAddr; +use netpod::log::*; use netpod::query::BinnedQuery; use netpod::timeunits::SEC; -use netpod::{ - channel_from_pairs, get_url_query_pairs, ChannelConfigQuery, FromUrl, NodeConfigCached, NodeStatus, - NodeStatusArchiverAppliance, -}; -use netpod::{log::*, ACCEPT_ALL}; -use netpod::{APP_JSON, APP_JSON_LINES, APP_OCTET}; +use netpod::{channel_from_pairs, get_url_query_pairs}; +use netpod::{FromUrl, NodeConfigCached, NodeStatus, NodeStatusArchiverAppliance}; +use netpod::{ACCEPT_ALL, APP_JSON, APP_JSON_LINES, APP_OCTET}; use nodenet::conn::events_service; use panic::{AssertUnwindSafe, UnwindSafe}; use pin::Pin; @@ -119,6 +118,7 @@ where impl UnwindSafe for Cont {} +// TODO remove because I want error bodies to be json. pub fn response_err(status: StatusCode, msg: T) -> Result, Error> where T: AsRef, @@ -205,6 +205,8 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } + } else if let Some(h) = channelconfig::ChannelConfigHandler::handler(&req) { + h.handle(req, &node_config).await } else if let Some(h) = events::EventsHandler::handler(&req) { h.handle(req, &node_config).await } else if path == "/api/4/binned" { @@ -285,15 +287,6 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } - } else if path == "/api/4/channel/config" { - if req.method() == Method::GET { - match channel_config(req, &node_config).await { - Ok(k) => Ok(k), - Err(e) => Ok(e.to_public_response()), - } - } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) - } } else if path == "/api/1/query" { if req.method() == Method::POST { Ok(api1::api1_binary_events(req, &node_config).await?) @@ -454,36 +447,35 @@ trait ToPublicResponse { impl ToPublicResponse for Error { fn to_public_response(&self) -> Response { - error!("ToPublicResponse converts: {self:?}"); - use std::fmt::Write; - let status = match self.reason() { - Some(::err::Reason::BadRequest) => StatusCode::BAD_REQUEST, - Some(::err::Reason::InternalError) => StatusCode::INTERNAL_SERVER_ERROR, - _ => StatusCode::INTERNAL_SERVER_ERROR, - }; - let mut msg = match self.public_msg() { - Some(v) => v.join("\n"), - _ => String::new(), - }; - write!(msg, "\n\nhttps://data-api.psi.ch/api/4/documentation\n").unwrap(); - response(status).body(Body::from(msg)).unwrap() + self.0.to_public_response() } } impl ToPublicResponse for ::err::Error { fn to_public_response(&self) -> Response { - use std::fmt::Write; - let status = match self.reason() { - Some(::err::Reason::BadRequest) => StatusCode::BAD_REQUEST, - Some(::err::Reason::InternalError) => StatusCode::INTERNAL_SERVER_ERROR, + use ::err::Reason; + let e = self.to_public_error(); + let status = match e.reason() { + Some(Reason::BadRequest) => StatusCode::BAD_REQUEST, + Some(Reason::InternalError) => StatusCode::INTERNAL_SERVER_ERROR, _ => StatusCode::INTERNAL_SERVER_ERROR, }; - let mut msg = match self.public_msg() { - Some(v) => v.join("\n"), - _ => String::new(), + let msg = match serde_json::to_string(&e) { + Ok(s) => s, + Err(_) => "can not serialize error".into(), }; - write!(msg, "\n\nhttps://data-api.psi.ch/api/4/documentation\n").unwrap(); - response(status).body(Body::from(msg)).unwrap() + match response(status) + .header(http::header::ACCEPT, APP_JSON) + .body(Body::from(msg)) + { + Ok(res) => res, + Err(e) => { + error!("can not generate http error response {e:?}"); + let mut res = Response::new(Body::default()); + *res.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + res + } + } } } @@ -739,24 +731,6 @@ pub async fn update_search_cache(req: Request, node_config: &NodeConfigCac Ok(ret) } -pub async fn channel_config(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - let url = Url::parse(&format!("dummy:{}", req.uri()))?; - //let pairs = get_url_query_pairs(&url); - let q = ChannelConfigQuery::from_url(&url)?; - let conf = if let Some(conf) = &node_config.node.channel_archiver { - archapp_wrap::archapp::archeng::channel_config_from_db(&q, conf, &node_config.node_config.cluster.database) - .await? - } else if let Some(conf) = &node_config.node.archiver_appliance { - archapp_wrap::channel_config(&q, conf).await? - } else { - parse::channelconfig::channel_config(&q, &node_config.node).await? - }; - let ret = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, APP_JSON) - .body(Body::from(serde_json::to_string(&conf)?))?; - Ok(ret) -} - pub async fn ca_connect_1(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let url = Url::parse(&format!("dummy:{}", req.uri()))?; let pairs = get_url_query_pairs(&url); diff --git a/httpret/src/pulsemap.rs b/httpret/src/pulsemap.rs index 362043b..f0f4b11 100644 --- a/httpret/src/pulsemap.rs +++ b/httpret/src/pulsemap.rs @@ -326,12 +326,11 @@ async fn update_task(do_abort: Arc, node_config: NodeConfigCached) info!("update_task break A"); break; } - tokio::time::sleep(Duration::from_millis(60000)).await; + tokio::time::sleep(Duration::from_millis(165000 + 0x7fff * commonio::tokio_rand().await?)).await; if do_abort.load(Ordering::SeqCst) != 0 { info!("update_task break B"); break; } - info!("Start update task"); let ts1 = Instant::now(); match IndexFullHttpFunction::index(&node_config).await { Ok(_) => {} diff --git a/items_proc/Cargo.toml b/items_proc/Cargo.toml index 1bfb180..25e13e7 100644 --- a/items_proc/Cargo.toml +++ b/items_proc/Cargo.toml @@ -7,3 +7,6 @@ edition = "2021" [lib] path = "src/items_proc.rs" proc-macro = true + +[dependencies] +syn = "1" diff --git a/items_proc/src/items_proc.rs b/items_proc/src/items_proc.rs index 2ce8331..6ebab87 100644 --- a/items_proc/src/items_proc.rs +++ b/items_proc/src/items_proc.rs @@ -80,3 +80,19 @@ pub fn enumvars(ts: TokenStream) -> TokenStream { //panic!("GENERATED: {}", gen); gen.parse().unwrap() } + +#[proc_macro] +pub fn enumvariants(ts: TokenStream) -> TokenStream { + //panic!("yoooo"); + //syn::parse_macro_input!(ts as syn::DeriveInput); + //let tokens: Vec<_> = ts.into_iter().collect(); + //let parsed: syn::DeriveInput = syn::parse_macro_input!(ts as syn::DeriveInput); + //let s = ts.to_string(); + let parsed = syn::parse::(ts); + //panic!("{:?}", parsed); + match parsed { + Ok(_ast) => {} + Err(e) => panic!("Parse error {e:?}"), + } + TokenStream::new() +} diff --git a/parse/src/channelconfig.rs b/parse/src/channelconfig.rs index db50e06..4f23034 100644 --- a/parse/src/channelconfig.rs +++ b/parse/src/channelconfig.rs @@ -321,10 +321,20 @@ pub async fn read_local_config(channel: Channel, node: Node) -> Result k, Err(e) => match e.kind() { - ErrorKind::NotFound => return Err(Error::with_msg(format!("ErrorKind::NotFound for {:?}", path))), + ErrorKind::NotFound => { + return Err(Error::with_public_msg(format!( + "databuffer channel config file not found for channel {channel:?} at {path:?}" + ))) + } + ErrorKind::PermissionDenied => { + return Err(Error::with_public_msg(format!( + "databuffer channel config file permission denied for channel {channel:?} at {path:?}" + ))) + } _ => return Err(e.into()), }, };