From 3c64eafd148240b65c912bc567caef74f0d265d7 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 8 Dec 2021 13:20:07 +0100 Subject: [PATCH] Factor usage of common error type more --- Cargo.toml | 12 ++-- archapp/src/archeng.rs | 22 +++++-- archapp/src/archeng/blockrefstream.rs | 2 +- archapp/src/archeng/configs.rs | 10 +-- archapp/src/archeng/datablock.rs | 12 ++++ archapp/src/archeng/indexfiles.rs | 56 +++++++++------- archapp/src/archeng/indextree.rs | 12 ++-- archapp/src/err.rs | 49 ++++++++++++++ archapp/src/events.rs | 4 +- archapp/src/lib.rs | 6 +- archapp/src/parse.rs | 14 ++-- archapp/src/timed.rs | 3 +- commonio/src/commonio.rs | 4 +- daqbuffer/Cargo.toml | 2 +- daqbuffer/src/bin/daqbuffer.rs | 10 +-- daqbuffer/src/cli.rs | 22 +++---- daqbuffer/src/err.rs | 17 +++++ daqbuffer/src/lib.rs | 1 + daqbufp2/src/client.rs | 15 +++-- daqbufp2/src/err.rs | 18 +++++ daqbufp2/src/lib.rs | 6 +- daqbufp2/src/nodes.rs | 4 +- daqbufp2/src/test/binnedbinary.rs | 7 +- daqbufp2/src/test/binnedjson.rs | 15 +++-- daqbufp2/src/test/events.rs | 14 ++-- daqbufp2/src/test/timeweightedjson.rs | 8 ++- dbconn/src/lib.rs | 47 ++++++++++---- dbconn/src/scan.rs | 87 ++++++++++++++----------- dbconn/src/search.rs | 27 ++++++-- disk/src/binned/binnedfrompbv.rs | 6 +- disk/src/cache.rs | 3 +- disk/src/dataopen.rs | 10 +-- disk/src/eventblobs.rs | 2 +- dq/Cargo.toml | 16 +++++ dq/src/bin/dq.rs | 42 ++++++++++++ dq/src/dq.rs | 1 + err/Cargo.toml | 20 +++--- err/src/lib.rs | 94 +++++++++++++-------------- fsio/src/fsio.rs | 3 +- httpclient/src/lib.rs | 9 ++- httpret/src/api1.rs | 13 ++-- httpret/src/channelarchiver.rs | 10 ++- httpret/src/err.rs | 72 ++++++++++++++++++++ httpret/src/gather.rs | 2 +- httpret/src/httpret.rs | 50 ++++++++++---- httpret/src/proxy.rs | 2 +- httpret/src/proxy/api4.rs | 9 ++- httpret/src/pulsemap.rs | 17 ++--- httpret/src/search.rs | 2 +- netfetch/src/ca.rs | 18 +++-- netfetch/src/test.rs | 4 +- netpod/src/api1.rs | 8 +-- netpod/src/netpod.rs | 70 ++++++++++++++++++++ parse/src/channelconfig.rs | 40 +++++++++++- taskrun/Cargo.toml | 4 +- taskrun/src/taskrun.rs | 72 -------------------- 56 files changed, 751 insertions(+), 354 deletions(-) create mode 100644 archapp/src/err.rs create mode 100644 daqbuffer/src/err.rs create mode 100644 daqbufp2/src/err.rs create mode 100644 dq/Cargo.toml create mode 100644 dq/src/bin/dq.rs create mode 100644 dq/src/dq.rs create mode 100644 httpret/src/err.rs diff --git a/Cargo.toml b/Cargo.toml index e0e512f..0392088 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,14 +1,14 @@ [workspace] -members = ["daqbuffer", "h5out", "archapp", "archapp_xc", "archapp_wrap", "items", "nodenet", "httpclient", "fsio"] +members = ["daqbuffer", "h5out", "archapp", "archapp_xc", "archapp_wrap", "items", "nodenet", "httpclient", "fsio", "dq"] [profile.release] opt-level = 1 -debug = 1 -#overflow-checks = true -#debug-assertions = true +debug = 0 +overflow-checks = false +debug-assertions = false lto = "off" -codegen-units = 160 +codegen-units = 32 incremental = true [patch.crates-io] -tokio = { git = "https://github.com/dominikwerder/tokio", rev = "637a0c8a" } +tokio = { git = "https://github.com/dominikwerder/tokio", rev = "f069c87f" } diff --git a/archapp/src/archeng.rs b/archapp/src/archeng.rs index fe32923..cae3d73 100644 --- a/archapp/src/archeng.rs +++ b/archapp/src/archeng.rs @@ -15,7 +15,7 @@ use crate::timed::Timed; use crate::wrap_task; use async_channel::{Receiver, Sender}; use commonio::StatsChannel; -use err::Error; +use err::{ErrStr, Error}; use futures_util::StreamExt; use items::{StreamItem, WithLen}; use netpod::log::*; @@ -151,7 +151,7 @@ pub fn list_all_channels(node: &ChannelArchiver) -> Receiver Receiver for ErrWrap { + fn from(x: tokio_postgres::Error) -> Self { + Self(x) + } +} + +impl From for Error { + fn from(_: ErrWrap) -> Self { + todo!() + } +} + pub async fn channel_config_from_db( q: &ChannelConfigQuery, conf: &ChannelArchiver, ) -> Result { let dbc = database_connect(&conf.database).await?; let sql = "select config from channels where name = $1"; - let rows = dbc.query(sql, &[&q.channel.name()]).await?; + let rows = dbc.query(sql, &[&q.channel.name()]).await.errstr()?; if let Some(row) = rows.first() { - let cfg: JsVal = row.try_get(0)?; + let cfg: JsVal = row.try_get(0).errstr()?; let val = cfg .get("shape") .ok_or_else(|| Error::with_msg_no_trace("shape not found on config"))?; diff --git a/archapp/src/archeng/blockrefstream.rs b/archapp/src/archeng/blockrefstream.rs index fde4e60..cbc75c2 100644 --- a/archapp/src/archeng/blockrefstream.rs +++ b/archapp/src/archeng/blockrefstream.rs @@ -193,7 +193,7 @@ mod test { use netpod::Database; #[test] - fn find_ref_1() -> Result<(), Error> { + fn find_ref_1() -> Result<(), err::Error> { let fut = async move { let channel = Channel { backend: "sls-archive".into(), diff --git a/archapp/src/archeng/configs.rs b/archapp/src/archeng/configs.rs index ae1fe73..6d208e7 100644 --- a/archapp/src/archeng/configs.rs +++ b/archapp/src/archeng/configs.rs @@ -1,5 +1,5 @@ use crate::archeng::indexfiles::database_connect; -use err::Error; +use err::{ErrStr, Error}; use futures_core::{Future, Stream}; use futures_util::{FutureExt, StreamExt}; use netpod::log::*; @@ -85,7 +85,7 @@ impl Stream for ChannelNameStream { "select rowid, name from channels where config = '{}'::jsonb and name > $1 order by name limit 64", &[&max_name], ) - .await?; + .await.errstr()?; Ok::<_, Error>(rows) }; self.select_fut = Some(Box::pin(fut)); @@ -183,7 +183,8 @@ impl Stream for ConfigStream { let fut = async move { let dbc = database_connect(&dbconf).await?; dbc.query("update channels set config = $2 where name = $1", &[&name, &config]) - .await?; + .await + .errstr()?; Ok(()) }; self.update_fut = Some(Box::pin(fut)); @@ -197,7 +198,8 @@ impl Stream for ConfigStream { let fut = async move { let dbc = database_connect(&dbconf).await?; dbc.query("update channels set config = $2 where name = $1", &[&name, &config]) - .await?; + .await + .errstr()?; Ok(()) }; self.update_fut = Some(Box::pin(fut)); diff --git a/archapp/src/archeng/datablock.rs b/archapp/src/archeng/datablock.rs index 346da35..c13ab43 100644 --- a/archapp/src/archeng/datablock.rs +++ b/archapp/src/archeng/datablock.rs @@ -109,24 +109,36 @@ impl DbrType { #[derive(Debug)] pub struct DatafileHeader { pos: DataheaderPos, + #[allow(unused)] dir_offset: u32, // Should be absolute file position of the next data header // together with `fname_next`. // But unfortunately not always set? + #[allow(unused)] next_offset: u32, + #[allow(unused)] prev_offset: u32, + #[allow(unused)] curr_offset: u32, pub num_samples: u32, + #[allow(unused)] ctrl_info_offset: u32, buf_size: u32, + #[allow(unused)] buf_free: u32, dbr_type: DbrType, dbr_count: usize, + #[allow(unused)] period: f64, + #[allow(unused)] ts_beg: Nanos, + #[allow(unused)] ts_end: Nanos, + #[allow(unused)] ts_next_file: Nanos, + #[allow(unused)] fname_next: String, + #[allow(unused)] fname_prev: String, } diff --git a/archapp/src/archeng/indexfiles.rs b/archapp/src/archeng/indexfiles.rs index be72feb..7d28c7f 100644 --- a/archapp/src/archeng/indexfiles.rs +++ b/archapp/src/archeng/indexfiles.rs @@ -2,7 +2,7 @@ use crate::timed::Timed; use crate::wrap_task; use async_channel::Receiver; use commonio::{open_read, read, StatsChannel}; -use err::Error; +use err::{ErrStr, Error}; use futures_core::{Future, Stream}; use futures_util::stream::unfold; use netpod::log::*; @@ -35,19 +35,19 @@ pub fn list_index_files(node: &ChannelArchiver) -> Receiver) -> Result, Error> { pub async fn database_connect(db_config: &Database) -> Result { let d = db_config; let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name); - let (cl, conn) = tokio_postgres::connect(&uri, tokio_postgres::NoTls).await?; + let (cl, conn) = tokio_postgres::connect(&uri, tokio_postgres::NoTls).await.errstr()?; // TODO monitor connection drop. let _cjh = tokio::spawn(async move { if let Err(e) = conn.await { @@ -179,27 +179,29 @@ impl ScanIndexFiles { let ps = p.to_string_lossy(); let rows = dbc .query("select rowid from indexfiles where path = $1", &[&ps]) - .await?; + .await + .errstr()?; let rid: i64 = if rows.len() == 0 { let rows = dbc .query( "insert into indexfiles (path) values ($1) on conflict do nothing returning rowid", &[&ps], ) - .await?; + .await + .errstr()?; if rows.len() == 0 { error!("insert failed, maybe concurrent insert?"); // TODO try this channel again? or the other process handled it? err::todoval() } else if rows.len() == 1 { - let rid = rows[0].try_get(0)?; + let rid = rows[0].try_get(0).errstr()?; info!("insert done: {}", rid); rid } else { return Err(Error::with_msg("not unique")); } } else if rows.len() == 1 { - let rid = rows[0].try_get(0)?; + let rid = rows[0].try_get(0).errstr()?; rid } else { return Err(Error::with_msg("not unique")); @@ -313,7 +315,7 @@ impl ScanChannels { let dbc = database_connect(&self.conf.database).await?; let sql = "select path from indexfiles where ts_last_channel_search < now() - interval '1 hour' limit 1"; - let rows = dbc.query(sql, &[]).await?; + let rows = dbc.query(sql, &[]).await.errstr()?; let mut paths = vec![]; for row in rows { paths.push(row.get::<_, String>(0)); @@ -329,36 +331,38 @@ impl ScanChannels { if let Some(path) = paths.pop() { let rows = dbc .query("select rowid from indexfiles where path = $1", &[&path]) - .await?; + .await + .errstr()?; if rows.len() == 1 { - let indexfile_rid: i64 = rows[0].try_get(0)?; + let indexfile_rid: i64 = rows[0].try_get(0).errstr()?; let mut file = open_read(path.clone().into(), stats).await?; let mut basics = super::indextree::IndexFileBasics::from_file(path, &mut file, stats).await?; let entries = basics.all_channel_entries(&mut file, stats).await?; for entry in entries { let rows = dbc .query("select rowid from channels where name = $1", &[&entry.channel_name()]) - .await?; + .await + .errstr()?; let rid: i64 = if rows.len() == 0 { let rows = dbc .query( "insert into channels (name) values ($1) on conflict do nothing returning rowid", &[&entry.channel_name()], ) - .await?; + .await.errstr()?; if rows.len() == 0 { error!("insert failed, maybe concurrent insert?"); // TODO try this channel again? or the other process handled it? err::todoval() } else if rows.len() == 1 { - let rid = rows[0].try_get(0)?; + let rid = rows[0].try_get(0).errstr()?; info!("insert done: {}", rid); rid } else { return Err(Error::with_msg("not unique")); } } else if rows.len() == 1 { - let rid = rows[0].try_get(0)?; + let rid = rows[0].try_get(0).errstr()?; rid } else { return Err(Error::with_msg("not unique")); @@ -367,13 +371,15 @@ impl ScanChannels { "insert into channel_index_map (channel, index) values ($1, $2) on conflict do nothing", &[&rid, &indexfile_rid], ) - .await?; + .await + .errstr()?; } dbc.query( "update indexfiles set ts_last_channel_search = now() where rowid = $1", &[&indexfile_rid], ) - .await?; + .await + .errstr()?; } } self.steps = Done; @@ -410,8 +416,14 @@ enum RetClass { #[derive(Debug)] enum IndexCat { - Machine { rc: RetClass }, - Beamline { rc: RetClass, name: String }, + Machine { + rc: RetClass, + }, + #[allow(unused)] + Beamline { + rc: RetClass, + name: String, + }, } #[derive(Debug)] @@ -532,10 +544,10 @@ fn categorize_index_files(list: &Vec) -> Result, Error> { pub async fn index_file_path_list(channel: Channel, dbconf: Database) -> Result, Error> { let dbc = database_connect(&dbconf).await?; let sql = "select i.path from indexfiles i, channels c, channel_index_map m where c.name = $1 and m.channel = c.rowid and i.rowid = m.index"; - let rows = dbc.query(sql, &[&channel.name()]).await?; + let rows = dbc.query(sql, &[&channel.name()]).await.errstr()?; let mut index_paths = vec![]; for row in rows { - index_paths.push(row.try_get(0)?); + index_paths.push(row.try_get(0).errstr()?); } let list = categorize_index_files(&index_paths)?; let ret = list.into_iter().map(|k| k.path).collect(); diff --git a/archapp/src/archeng/indextree.rs b/archapp/src/archeng/indextree.rs index c718112..b3b6e89 100644 --- a/archapp/src/archeng/indextree.rs +++ b/archapp/src/archeng/indextree.rs @@ -1,17 +1,17 @@ +use crate::archeng::backreadbuf::BackReadBuf; use crate::archeng::{format_hex_block, name_hash, readu16, readu32, readu64, StatsChannel, EPICS_EPOCH_OFFSET}; use commonio::open_read; use commonio::ringbuf::RingBuf; use err::Error; -use netpod::{log::*, NanoRange}; -use netpod::{timeunits::SEC, FilePos, Nanos}; +use netpod::log::*; +use netpod::timeunits::SEC; +use netpod::{FilePos, NanoRange, Nanos}; use std::collections::VecDeque; use std::fmt; use std::path::{Path, PathBuf}; use std::time::{Duration, Instant}; use tokio::fs::File; -use super::backreadbuf::BackReadBuf; - pub trait HeaderVersion: Send + Sync + fmt::Debug { fn version(&self) -> u8; fn read_offset(&self, buf: &[u8], pos: usize) -> u64; @@ -71,6 +71,7 @@ pub struct NamedHashChannelEntry { next: u64, id_rtree_pos: u64, channel_name: String, + #[allow(dead_code)] id_txt: String, } @@ -80,6 +81,7 @@ impl NamedHashChannelEntry { } } +#[allow(dead_code)] #[derive(Debug)] pub struct IndexFileBasics { path: PathBuf, @@ -778,7 +780,9 @@ pub async fn read_rtree_entrypoint( #[derive(Debug)] pub struct TreeSearchStats { + #[allow(dead_code)] duration: Duration, + #[allow(dead_code)] node_reads: usize, } diff --git a/archapp/src/err.rs b/archapp/src/err.rs new file mode 100644 index 0000000..266c701 --- /dev/null +++ b/archapp/src/err.rs @@ -0,0 +1,49 @@ +use std::fmt; + +pub struct Error(err::Error); + +impl Error { + pub fn with_msg>(s: S) -> Self { + Self(err::Error::with_msg(s)) + } + + pub fn with_msg_no_trace>(s: S) -> Self { + Self(err::Error::with_msg_no_trace(s)) + } +} + +impl fmt::Debug for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + self.0.fmt(fmt) + } +} + +impl From for err::Error { + fn from(x: Error) -> Self { + x.0 + } +} + +impl From for Error { + fn from(k: std::string::FromUtf8Error) -> Self { + Self::with_msg(k.to_string()) + } +} + +impl From for Error { + fn from(k: std::io::Error) -> Self { + Self::with_msg(k.to_string()) + } +} + +impl From> for Error { + fn from(k: async_channel::SendError) -> Self { + Self::with_msg(k.to_string()) + } +} + +impl From for Error { + fn from(k: serde_json::Error) -> Self { + Self::with_msg(k.to_string()) + } +} diff --git a/archapp/src/events.rs b/archapp/src/events.rs index b4e9d09..22f3c06 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -3,7 +3,7 @@ use crate::parse::multi::parse_all_ts; use crate::parse::PbFileReader; use crate::storagemerge::StorageMerge; use chrono::{TimeZone, Utc}; -use err::Error; +use err::{ErrStr, Error}; use futures_core::Stream; use futures_util::StreamExt; use items::binnedevents::{MultiBinWaveEvents, SingleBinWaveEvents, XBinnedEvents}; @@ -393,7 +393,7 @@ pub async fn make_single_event_pipe( } let ei2 = ei.x_aggregate(&evq.agg_kind); let g = Ok(StreamItem::DataItem(RangeCompletableItem::Data(ei2))); - tx.send(g).await?; + tx.send(g).await.errstr()?; if let Some(t) = tslast { if t >= evq.range.end { info!("after requested range, break"); diff --git a/archapp/src/lib.rs b/archapp/src/lib.rs index 4db8bec..769a85c 100644 --- a/archapp/src/lib.rs +++ b/archapp/src/lib.rs @@ -3,6 +3,7 @@ pub mod generated; #[cfg(not(feature = "devread"))] pub mod generated {} pub mod archeng; +pub mod err; pub mod events; #[cfg(feature = "devread")] pub mod parse; @@ -14,14 +15,13 @@ pub mod storagemerge; pub mod test; pub mod timed; -use std::sync::atomic::{AtomicUsize, Ordering}; - +use ::err::Error; use async_channel::Sender; -use err::Error; use futures_core::Future; use netpod::log::*; #[cfg(not(feature = "devread"))] pub use parsestub as parse; +use std::sync::atomic::{AtomicUsize, Ordering}; fn unescape_archapp_msg(inp: &[u8], mut ret: Vec) -> Result, Error> { ret.clear(); diff --git a/archapp/src/parse.rs b/archapp/src/parse.rs index 6e652a5..a6f5269 100644 --- a/archapp/src/parse.rs +++ b/archapp/src/parse.rs @@ -6,7 +6,7 @@ use crate::unescape_archapp_msg; use archapp_xc::*; use async_channel::{bounded, Receiver}; use chrono::{TimeZone, Utc}; -use err::Error; +use err::{ErrStr, Error}; use items::eventsitem::EventsItem; use items::eventvalues::EventValues; use items::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents}; @@ -445,7 +445,7 @@ pub async fn scan_files_inner( } }, Err(e) => { - tx.send(Err(e.into())).await?; + tx.send(Err(e.into())).await.errstr()?; } } } @@ -465,12 +465,15 @@ pub async fn scan_files_inner( if channel_path != normalized_channel_name { { let s = format!("{} - {}", channel_path, normalized_channel_name); - tx.send(Ok(Box::new(serde_json::to_value(&s)?) as ItemSerBox)).await?; + tx.send(Ok(Box::new(serde_json::to_value(&s)?) as ItemSerBox)) + .await + .errstr()?; } tx.send(Ok( Box::new(JsonValue::String(format!("MISMATCH --------------------"))) as ItemSerBox, )) - .await?; + .await + .errstr()?; } else { if false { dbconn::insert_channel(channel_path.into(), ndi.facility, &dbc).await?; @@ -484,7 +487,8 @@ pub async fn scan_files_inner( pbr.channel_name(), msg.variant_name() ))?) as ItemSerBox)) - .await?; + .await + .errstr()?; } } } diff --git a/archapp/src/timed.rs b/archapp/src/timed.rs index b4fc284..98aa69e 100644 --- a/archapp/src/timed.rs +++ b/archapp/src/timed.rs @@ -1,6 +1,5 @@ -use std::time::Instant; - use netpod::log::*; +use std::time::Instant; pub struct Timed { name: String, diff --git a/commonio/src/commonio.rs b/commonio/src/commonio.rs index 8a53a9a..fa16397 100644 --- a/commonio/src/commonio.rs +++ b/commonio/src/commonio.rs @@ -1,7 +1,7 @@ pub mod ringbuf; use async_channel::Sender; -use err::Error; +use err::{ErrStr, Error}; use futures_util::StreamExt; use items::eventsitem::EventsItem; use items::{Sitemty, StatsItem, StreamItem}; @@ -43,7 +43,7 @@ impl StatsChannel { } pub async fn send(&self, item: StatsItem) -> Result<(), Error> { - Ok(self.chn.send(Ok(StreamItem::Stats(item))).await?) + Ok(self.chn.send(Ok(StreamItem::Stats(item))).await.errstr()?) } } diff --git a/daqbuffer/Cargo.toml b/daqbuffer/Cargo.toml index b485b3c..03a0b35 100644 --- a/daqbuffer/Cargo.toml +++ b/daqbuffer/Cargo.toml @@ -22,7 +22,7 @@ serde_derive = "1.0" serde_json = "1.0" chrono = "0.4" url = "2.2.2" -clap = "3.0.0-beta.2" +clap = "3.0.0-beta.5" lazy_static = "1.4.0" err = { path = "../err" } taskrun = { path = "../taskrun" } diff --git a/daqbuffer/src/bin/daqbuffer.rs b/daqbuffer/src/bin/daqbuffer.rs index 459acc6..7b68a97 100644 --- a/daqbuffer/src/bin/daqbuffer.rs +++ b/daqbuffer/src/bin/daqbuffer.rs @@ -1,5 +1,5 @@ use chrono::{DateTime, Duration, Utc}; -use clap::Clap; +use clap::Parser; use daqbuffer::cli::{ClientType, Opts, SubCmd}; use err::Error; use netpod::log::*; @@ -101,7 +101,7 @@ async fn go() -> Result<(), Error> { let jh = tokio::task::spawn_blocking(move || { taskrun::append::append(&k.dir, std::io::stdin(), std::io::stderr()).unwrap(); }); - jh.await?; + jh.await.map_err(Error::from_string)?; } SubCmd::Test => (), } @@ -110,6 +110,7 @@ async fn go() -> Result<(), Error> { #[test] fn simple_fetch() { + use daqbuffer::err::ErrConv; use netpod::Nanos; use netpod::{timeunits::*, ByteOrder, Channel, ChannelConfig, ScalarType, Shape}; taskrun::run(async { @@ -137,9 +138,10 @@ fn simple_fetch() { let req = hyper::Request::builder() .method(http::Method::POST) .uri("http://localhost:8360/api/4/parsed_raw") - .body(query_string.into())?; + .body(query_string.into()) + .ec()?; let client = hyper::Client::new(); - let res = client.request(req).await?; + let res = client.request(req).await.ec()?; info!("client response {:?}", res); let mut res_body = res.into_body(); use hyper::body::HttpBody; diff --git a/daqbuffer/src/cli.rs b/daqbuffer/src/cli.rs index bce16b3..da1c77e 100644 --- a/daqbuffer/src/cli.rs +++ b/daqbuffer/src/cli.rs @@ -1,6 +1,6 @@ -use clap::{crate_version, Clap}; +use clap::{crate_version, Parser}; -#[derive(Debug, Clap)] +#[derive(Debug, Parser)] #[clap(name="daqbuffer", author="Dominik Werder ", version=crate_version!())] pub struct Opts { #[clap(short, long, parse(from_occurrences))] @@ -9,7 +9,7 @@ pub struct Opts { pub subcmd: SubCmd, } -#[derive(Debug, Clap)] +#[derive(Debug, Parser)] pub enum SubCmd { Retrieval(Retrieval), Proxy(Proxy), @@ -20,31 +20,31 @@ pub enum SubCmd { Test, } -#[derive(Debug, Clap)] +#[derive(Debug, Parser)] pub struct Retrieval { #[clap(long)] pub config: String, } -#[derive(Debug, Clap)] +#[derive(Debug, Parser)] pub struct Proxy { #[clap(long)] pub config: String, } -#[derive(Debug, Clap)] +#[derive(Debug, Parser)] pub struct Client { #[clap(subcommand)] pub client_type: ClientType, } -#[derive(Debug, Clap)] +#[derive(Debug, Parser)] pub enum ClientType { Binned(BinnedClient), Status(StatusClient), } -#[derive(Debug, Clap)] +#[derive(Debug, Parser)] pub struct StatusClient { #[clap(long)] pub host: String, @@ -52,7 +52,7 @@ pub struct StatusClient { pub port: u16, } -#[derive(Debug, Clap)] +#[derive(Debug, Parser)] pub struct BinnedClient { #[clap(long)] pub host: String, @@ -74,13 +74,13 @@ pub struct BinnedClient { pub disk_stats_every_kb: u32, } -#[derive(Debug, Clap)] +#[derive(Debug, Parser)] pub struct Zmtp { #[clap(long)] pub addr: String, } -#[derive(Debug, Clap)] +#[derive(Debug, Parser)] pub struct Logappend { #[clap(long)] pub dir: String, diff --git a/daqbuffer/src/err.rs b/daqbuffer/src/err.rs new file mode 100644 index 0000000..98c6a81 --- /dev/null +++ b/daqbuffer/src/err.rs @@ -0,0 +1,17 @@ +pub trait ErrConv { + fn ec(self) -> Result; +} + +pub trait Convable: ToString {} + +impl ErrConv for Result { + fn ec(self) -> Result { + match self { + Ok(x) => Ok(x), + Err(e) => Err(::err::Error::from_string(e.to_string())), + } + } +} + +impl Convable for http::Error {} +impl Convable for hyper::Error {} diff --git a/daqbuffer/src/lib.rs b/daqbuffer/src/lib.rs index 4f77372..bdf0a8d 100644 --- a/daqbuffer/src/lib.rs +++ b/daqbuffer/src/lib.rs @@ -1 +1,2 @@ pub mod cli; +pub mod err; diff --git a/daqbufp2/src/client.rs b/daqbufp2/src/client.rs index 32c4732..109527c 100644 --- a/daqbufp2/src/client.rs +++ b/daqbufp2/src/client.rs @@ -1,3 +1,4 @@ +use crate::err::ErrConv; use chrono::{DateTime, Utc}; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::streamlog::Streamlog; @@ -18,14 +19,15 @@ pub async fn status(host: String, port: u16) -> Result<(), Error> { let req = hyper::Request::builder() .method(http::Method::GET) .uri(uri) - .body(Body::empty())?; + .body(Body::empty()) + .ec()?; let client = hyper::Client::new(); - let res = client.request(req).await?; + let res = client.request(req).await.ec()?; if res.status() != StatusCode::OK { error!("Server error {:?}", res); return Err(Error::with_msg(format!("Server error {:?}", res))); } - let body = hyper::body::to_bytes(res.into_body()).await?; + let body = hyper::body::to_bytes(res.into_body()).await.ec()?; let res = String::from_utf8(body.to_vec())?; let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; @@ -68,13 +70,14 @@ pub async fn get_binned( .method(http::Method::GET) .uri(url.to_string()) .header(http::header::ACCEPT, APP_OCTET) - .body(Body::empty())?; + .body(Body::empty()) + .ec()?; let client = hyper::Client::new(); - let res = client.request(req).await?; + let res = client.request(req).await.ec()?; if res.status() != StatusCode::OK { error!("Server error {:?}", res); let (head, body) = res.into_parts(); - let buf = hyper::body::to_bytes(body).await?; + let buf = hyper::body::to_bytes(body).await.ec()?; let s = String::from_utf8_lossy(&buf); return Err(Error::with_msg(format!( concat!( diff --git a/daqbufp2/src/err.rs b/daqbufp2/src/err.rs new file mode 100644 index 0000000..f13c35d --- /dev/null +++ b/daqbufp2/src/err.rs @@ -0,0 +1,18 @@ +pub trait ErrConv { + fn ec(self) -> Result; +} + +pub trait Convable: ToString {} + +impl ErrConv for Result { + fn ec(self) -> Result { + match self { + Ok(x) => Ok(x), + Err(e) => Err(::err::Error::from_string(e.to_string())), + } + } +} + +impl Convable for http::Error {} +impl Convable for hyper::Error {} +impl Convable for tokio::task::JoinError {} diff --git a/daqbufp2/src/lib.rs b/daqbufp2/src/lib.rs index 5671f9f..6a445e9 100644 --- a/daqbufp2/src/lib.rs +++ b/daqbufp2/src/lib.rs @@ -1,9 +1,11 @@ pub mod client; +pub mod err; pub mod nodes; #[cfg(test)] pub mod test; -use err::Error; +use ::err::Error; +use futures_util::TryFutureExt; use netpod::{Cluster, NodeConfig, NodeConfigCached, ProxyConfig}; use tokio::task::JoinHandle; @@ -16,7 +18,7 @@ pub fn spawn_test_hosts(cluster: Cluster) -> Vec>> }; let node_config: Result = node_config.into(); let node_config = node_config.unwrap(); - let h = tokio::spawn(httpret::host(node_config)); + let h = tokio::spawn(httpret::host(node_config).map_err(Error::from)); ret.push(h); } ret diff --git a/daqbufp2/src/nodes.rs b/daqbufp2/src/nodes.rs index 3b81707..fd439df 100644 --- a/daqbufp2/src/nodes.rs +++ b/daqbufp2/src/nodes.rs @@ -36,7 +36,7 @@ pub fn require_test_hosts_running() -> Result, Error> { match g.as_ref() { None => { netpod::log::info!("\n\n+++++++++++++++++++ MAKE NEW RunningHosts\n\n"); - let cluster = taskrun::test_cluster(); + let cluster = netpod::test_cluster(); let jhs = spawn_test_hosts(cluster.clone()); let ret = RunningHosts { cluster: cluster.clone(), @@ -58,7 +58,7 @@ pub fn require_sls_test_host_running() -> Result, Error> { match g.as_ref() { None => { netpod::log::info!("\n\n+++++++++++++++++++ MAKE NEW RunningSlsHost\n\n"); - let cluster = taskrun::sls_test_cluster(); + let cluster = netpod::sls_test_cluster(); let jhs = spawn_test_hosts(cluster.clone()); let ret = RunningSlsHost { cluster: cluster.clone(), diff --git a/daqbufp2/src/test/binnedbinary.rs b/daqbufp2/src/test/binnedbinary.rs index 41253dc..3abe68a 100644 --- a/daqbufp2/src/test/binnedbinary.rs +++ b/daqbufp2/src/test/binnedbinary.rs @@ -1,3 +1,4 @@ +use crate::err::ErrConv; use crate::nodes::require_test_hosts_running; use chrono::{DateTime, Utc}; use disk::frame::inmem::InMemoryFrameAsyncReadStream; @@ -115,9 +116,10 @@ where .method(http::Method::GET) .uri(url.to_string()) .header(http::header::ACCEPT, APP_OCTET) - .body(Body::empty())?; + .body(Body::empty()) + .ec()?; let client = hyper::Client::new(); - let res = client.request(req).await?; + let res = client.request(req).await.ec()?; if res.status() != StatusCode::OK { error!("client response {:?}", res); } @@ -146,6 +148,7 @@ pub struct BinnedResponse { bytes_read: u64, range_complete_count: u64, log_item_count: u64, + #[allow(unused)] stats_item_count: u64, } diff --git a/daqbufp2/src/test/binnedjson.rs b/daqbufp2/src/test/binnedjson.rs index 9542d90..56c384c 100644 --- a/daqbufp2/src/test/binnedjson.rs +++ b/daqbufp2/src/test/binnedjson.rs @@ -1,3 +1,4 @@ +use crate::err::ErrConv; use crate::nodes::{require_sls_test_host_running, require_test_hosts_running}; use chrono::{DateTime, Utc}; use err::Error; @@ -224,13 +225,14 @@ async fn get_binned_json_common( .method(http::Method::GET) .uri(url.to_string()) .header(http::header::ACCEPT, APP_JSON) - .body(Body::empty())?; + .body(Body::empty()) + .ec()?; let client = hyper::Client::new(); - let res = client.request(req).await?; + let res = client.request(req).await.ec()?; if res.status() != StatusCode::OK { error!("get_binned_json_common client response {:?}", res); } - let res = hyper::body::to_bytes(res.into_body()).await?; + let res = hyper::body::to_bytes(res.into_body()).await.ec()?; let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; debug!("get_binned_json_common DONE time {} ms", ms); @@ -353,13 +355,14 @@ async fn get_binned_json_common_res( .method(http::Method::GET) .uri(url.to_string()) .header(http::header::ACCEPT, APP_JSON) - .body(Body::empty())?; + .body(Body::empty()) + .ec()?; let client = hyper::Client::new(); - let res = client.request(req).await?; + let res = client.request(req).await.ec()?; if res.status() != StatusCode::OK { error!("get_binned_json_common client response {:?}", res); } - let res = hyper::body::to_bytes(res.into_body()).await?; + let res = hyper::body::to_bytes(res.into_body()).await.ec()?; let t2 = chrono::Utc::now(); let _ms = t2.signed_duration_since(t1).num_milliseconds() as u64; let res = String::from_utf8_lossy(&res).to_string(); diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs index 258eabe..d60589a 100644 --- a/daqbufp2/src/test/events.rs +++ b/daqbufp2/src/test/events.rs @@ -1,3 +1,4 @@ +use crate::err::ErrConv; use crate::nodes::require_test_hosts_running; use chrono::{DateTime, Utc}; use disk::events::{PlainEventsBinaryQuery, PlainEventsJsonQuery}; @@ -72,9 +73,10 @@ where .method(http::Method::GET) .uri(url.to_string()) .header(http::header::ACCEPT, APP_OCTET) - .body(Body::empty())?; + .body(Body::empty()) + .ec()?; let client = hyper::Client::new(); - let res = client.request(req).await?; + let res = client.request(req).await.ec()?; if res.status() != StatusCode::OK { error!("client response {:?}", res); } @@ -100,6 +102,7 @@ pub struct EventsResponse { bytes_read: u64, range_complete_count: u64, log_item_count: u64, + #[allow(unused)] stats_item_count: u64, } @@ -278,13 +281,14 @@ async fn get_plain_events_json( .method(http::Method::GET) .uri(url.to_string()) .header(http::header::ACCEPT, APP_JSON) - .body(Body::empty())?; + .body(Body::empty()) + .ec()?; let client = hyper::Client::new(); - let res = client.request(req).await?; + let res = client.request(req).await.ec()?; if res.status() != StatusCode::OK { error!("client response {:?}", res); } - let buf = hyper::body::to_bytes(res.into_body()).await?; + let buf = hyper::body::to_bytes(res.into_body()).await.ec()?; let s = String::from_utf8_lossy(&buf); let _res: JsonValue = serde_json::from_str(&s)?; // TODO assert more diff --git a/daqbufp2/src/test/timeweightedjson.rs b/daqbufp2/src/test/timeweightedjson.rs index 84ac7dd..64fc422 100644 --- a/daqbufp2/src/test/timeweightedjson.rs +++ b/daqbufp2/src/test/timeweightedjson.rs @@ -1,3 +1,4 @@ +use crate::err::ErrConv; use crate::nodes::require_test_hosts_running; use chrono::{DateTime, Utc}; use err::Error; @@ -180,13 +181,14 @@ async fn get_json_common( .method(http::Method::GET) .uri(url.to_string()) .header(http::header::ACCEPT, APP_JSON) - .body(Body::empty())?; + .body(Body::empty()) + .ec()?; let client = hyper::Client::new(); - let res = client.request(req).await?; + let res = client.request(req).await.ec()?; if res.status() != StatusCode::OK { error!("get_json_common client response {:?}", res); } - let res = hyper::body::to_bytes(res.into_body()).await?; + let res = hyper::body::to_bytes(res.into_body()).await.ec()?; let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; // TODO add timeout diff --git a/dbconn/src/lib.rs b/dbconn/src/lib.rs index dd0a204..5b4d7fa 100644 --- a/dbconn/src/lib.rs +++ b/dbconn/src/lib.rs @@ -1,14 +1,36 @@ +pub mod scan; +pub mod search; + +pub mod pg { + pub use tokio_postgres::{Client, Error}; +} + use err::Error; use netpod::log::*; use netpod::{Channel, Database, NodeConfigCached}; use std::time::Duration; use tokio_postgres::{Client, NoTls}; -pub mod scan; -pub mod search; +trait ErrConv { + fn errconv(self) -> Result; +} -pub mod pg { - pub use tokio_postgres::Client; +impl ErrConv for Result { + fn errconv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg(e.to_string())), + } + } +} + +impl ErrConv for Result> { + fn errconv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg(e.to_string())), + } + } } pub async fn delay_us(mu: u64) { @@ -26,7 +48,7 @@ pub async fn delay_io_medium() { pub async fn create_connection(db_config: &Database) -> Result { let d = db_config; let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name); - let (cl, conn) = tokio_postgres::connect(&uri, NoTls).await?; + let (cl, conn) = tokio_postgres::connect(&uri, NoTls).await.errconv()?; // TODO monitor connection drop. let _cjh = tokio::spawn(async move { if let Err(e) = conn.await { @@ -41,7 +63,8 @@ pub async fn channel_exists(channel: &Channel, node_config: &NodeConfigCached) - let cl = create_connection(&node_config.node_config.cluster.database).await?; let rows = cl .query("select rowid from channels where name = $1::text", &[&channel.name]) - .await?; + .await + .errconv()?; debug!("channel_exists {} rows", rows.len()); for row in rows { debug!( @@ -61,7 +84,8 @@ pub async fn database_size(node_config: &NodeConfigCached) -> Result "select pg_database_size($1::text)", &[&node_config.node_config.cluster.database.name], ) - .await?; + .await + .errconv()?; if rows.len() == 0 { Err(Error::with_msg("could not get database size"))?; } @@ -87,7 +111,7 @@ pub async fn table_sizes(node_config: &NodeConfigCached) -> Result Result Result { let sql = "select name from channels order by rowid limit 1 offset (random() * (select count(rowid) from channels))::bigint"; let cl = create_connection(&node_config.node_config.cluster.database).await?; - let rows = cl.query(sql, &[]).await?; + let rows = cl.query(sql, &[]).await.errconv()?; if rows.len() == 0 { Err(Error::with_msg("can not get random channel"))?; } @@ -112,11 +136,12 @@ pub async fn insert_channel(name: String, facility: i64, dbc: &Client) -> Result "select count(rowid) from channels where facility = $1 and name = $2", &[&facility, &name], ) - .await?; + .await + .errconv()?; if rows[0].get::<_, i64>(0) == 0 { let sql = concat!("insert into channels (facility, name) values ($1, $2) on conflict (facility, name) do nothing"); - dbc.query(sql, &[&facility, &name]).await?; + dbc.query(sql, &[&facility, &name]).await.errconv()?; } Ok(()) } diff --git a/dbconn/src/scan.rs b/dbconn/src/scan.rs index c265b9e..243dfa7 100644 --- a/dbconn/src/scan.rs +++ b/dbconn/src/scan.rs @@ -6,6 +6,7 @@ use futures_core::Stream; use futures_util::{pin_mut, FutureExt, StreamExt}; use netpod::log::*; use netpod::{Database, NodeConfigCached}; +use parse::channelconfig::NErr; use pin_project::pin_project; use serde::{Deserialize, Serialize}; use std::future::Future; @@ -19,6 +20,8 @@ use std::task::{Context, Poll}; use tokio::fs::{DirEntry, ReadDir}; use tokio_postgres::Client; +use crate::ErrConv; + #[derive(Debug, Serialize, Deserialize)] pub struct NodeDiskIdent { pub rowid: i64, @@ -45,7 +48,8 @@ pub async fn get_node_disk_ident(node_config: &NodeConfigCached, dbc: &Client) - let sql = "select nodes.rowid, facility, split, hostname from nodes, facilities where facilities.name = $1 and facility = facilities.rowid and hostname = $2"; let rows = dbc .query(sql, &[&node_config.node.backend, &node_config.node.host]) - .await?; + .await + .errconv()?; if rows.len() != 1 { return Err(Error::with_msg(format!( "get_node can't find unique entry for {} {}", @@ -68,7 +72,8 @@ pub async fn get_node_disk_ident_2( let sql = "select nodes.rowid, facility, split, hostname from nodes, facilities where facilities.name = $1 and facility = facilities.rowid and hostname = $2"; let rows = dbc .query(sql, &[&node_config.node.backend, &node_config.node.host]) - .await?; + .await + .errconv()?; if rows.len() != 1 { return Err(Error::with_msg(format!( "get_node can't find unique entry for {} {}", @@ -315,15 +320,16 @@ impl Stream for UpdatedDbWithChannelNamesStream { async fn update_db_with_channel_name_list(list: Vec, backend: i64, dbc: &Client) -> Result<(), Error> { crate::delay_io_short().await; - dbc.query("begin", &[]).await?; + dbc.query("begin", &[]).await.errconv()?; for ch in list { dbc.query( "insert into channels (facility, name) values ($1, $2) on conflict do nothing", &[&backend, &ch], ) - .await?; + .await + .errconv()?; } - dbc.query("commit", &[]).await?; + dbc.query("commit", &[]).await.errconv()?; Ok(()) } @@ -338,7 +344,7 @@ pub async fn update_db_with_channel_names( let dbc = crate::create_connection(&db_config).await?; let node_disk_ident = get_node_disk_ident(&node_config, &dbc).await?; let c1 = Arc::new(RwLock::new(0u32)); - dbc.query("begin", &[]).await?; + dbc.query("begin", &[]).await.errconv()?; let dbc = Arc::new(dbc); let tx = Arc::new(tx); find_channel_names_from_config(&node_config.node.data_base_path, |ch| { @@ -353,33 +359,34 @@ pub async fn update_db_with_channel_names( "insert into channels (facility, name) values ($1, $2) on conflict do nothing", &[&fac, &ch], ) - .await?; + .await + .errconv()?; let c2 = { let mut g = c1.write()?; *g += 1; *g }; if c2 % 200 == 0 { - dbc.query("commit", &[]).await?; + dbc.query("commit", &[]).await.errconv()?; let ret = UpdatedDbWithChannelNames { msg: format!("current {}", ch), count: c2, }; - tx.send(Ok(ret)).await?; + tx.send(Ok(ret)).await.errconv()?; crate::delay_io_medium().await; - dbc.query("begin", &[]).await?; + dbc.query("begin", &[]).await.errconv()?; } Ok(()) } }) .await?; - dbc.query("commit", &[]).await?; + dbc.query("commit", &[]).await.errconv()?; let c2 = *c1.read()?; let ret = UpdatedDbWithChannelNames { msg: format!("all done"), count: c2, }; - tx.send(Ok(ret)).await?; + tx.send(Ok(ret)).await.errconv()?; Ok::<_, Error>(()) }; let block2 = async move { @@ -440,15 +447,16 @@ pub async fn update_db_with_all_channel_configs( "select rowid, facility, name from channels where facility = $1 order by facility, name", &[&node_disk_ident.facility], ) - .await?; + .await + .errconv()?; let mut c1 = 0; - dbc.query("begin", &[]).await?; + dbc.query("begin", &[]).await.errconv()?; let mut count_inserted = 0; let mut count_updated = 0; for row in rows { - let rowid: i64 = row.try_get(0)?; - let _facility: i64 = row.try_get(1)?; - let channel: String = row.try_get(2)?; + let rowid: i64 = row.try_get(0).errconv()?; + let _facility: i64 = row.try_get(1).errconv()?; + let channel: String = row.try_get(2).errconv()?; match update_db_with_channel_config( node_config, node_disk_ident, @@ -471,26 +479,26 @@ pub async fn update_db_with_all_channel_configs( Ok(UpdateChannelConfigResult::Done) => { c1 += 1; if c1 % 200 == 0 { - dbc.query("commit", &[]).await?; + dbc.query("commit", &[]).await.errconv()?; let msg = format!( "channel no {:6} inserted {:6} updated {:6}", c1, count_inserted, count_updated ); let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 }; - tx.send(Ok(ret)).await?; - dbc.query("begin", &[]).await?; + tx.send(Ok(ret)).await.errconv()?; + dbc.query("begin", &[]).await.errconv()?; } crate::delay_io_short().await; } } } - dbc.query("commit", &[]).await?; + dbc.query("commit", &[]).await.errconv()?; let msg = format!( "ALL DONE channel no {:6} inserted {:6} updated {:6}", c1, count_inserted, count_updated ); let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 }; - tx.send(Ok(ret)).await?; + tx.send(Ok(ret)).await.errconv()?; Ok::<_, Error>(()) } .then({ @@ -500,7 +508,7 @@ pub async fn update_db_with_all_channel_configs( Err(e) => { let msg = format!("Seeing error: {:?}", e); let ret = UpdatedDbWithAllChannelConfigs { msg, count: 0 }; - tx2.send(Ok(ret)).await?; + tx2.send(Ok(ret)).await.errconv()?; } } Ok::<_, Error>(()) @@ -523,7 +531,7 @@ pub async fn update_db_with_all_channel_configs( pub async fn update_search_cache(node_config: &NodeConfigCached) -> Result<(), Error> { let dbc = crate::create_connection(&node_config.node_config.cluster.database).await?; - dbc.query("select update_cache()", &[]).await?; + dbc.query("select update_cache()", &[]).await.errconv()?; Ok(()) } @@ -564,7 +572,8 @@ pub async fn update_db_with_channel_config( "select rowid, fileSize, parsedUntil, channel from configs where node = $1 and channel = $2", &[&node_disk_ident.rowid(), &channel_id], ) - .await?; + .await + .errconv()?; if rows.len() > 1 { return Err(Error::with_msg("more than one row")); } @@ -579,7 +588,7 @@ pub async fn update_db_with_channel_config( "insert into configs_history (rowid_original, node, channel, fileSize, parsedUntil, config, tsinsert) ", "select rowid as rowid_original, node, channel, fileSize, parsedUntil, config, now() from configs where rowid = $1" ); - dbc.query(sql, &[&rowid]).await?; + dbc.query(sql, &[&rowid]).await.errconv()?; } //ensure!(meta.len() >= parsed_until as u64, ConfigFileOnDiskShrunk{path}); (Some(rowid), true) @@ -588,7 +597,7 @@ pub async fn update_db_with_channel_config( }; if do_parse { let buf = tokio::fs::read(&path).await?; - let config = parse::channelconfig::parse_config(&buf)?.1; + let config = parse::channelconfig::parse_config(&buf).map_err(NErr::from)?.1; match config_id { None => { dbc.query( @@ -601,14 +610,15 @@ pub async fn update_db_with_channel_config( &serde_json::to_value(config)?, ], ) - .await?; + .await + .errconv()?; *count_inserted += 1; } Some(_config_id_2) => { dbc.query( "insert into configs (node, channel, fileSize, parsedUntil, config) values ($1, $2, $3, $4, $5) on conflict (node, channel) do update set fileSize = $3, parsedUntil = $4, config = $5", &[&node_disk_ident.rowid(), &channel_id, &(meta.len() as i64), &(buf.len() as i64), &serde_json::to_value(config)?], - ).await?; + ).await.errconv()?; *count_updated += 1; } } @@ -627,25 +637,26 @@ pub async fn update_db_with_all_channel_datafiles( "select rowid, facility, name from channels where facility = $1 order by facility, name", &[&node_disk_ident.facility()], ) - .await?; + .await + .errconv()?; let mut c1 = 0; - dbc.query("begin", &[]).await?; + dbc.query("begin", &[]).await.errconv()?; for row in rows { - let rowid: i64 = row.try_get(0)?; - let _facility: i64 = row.try_get(1)?; - let channel: String = row.try_get(2)?; + let rowid: i64 = row.try_get(0).errconv()?; + let _facility: i64 = row.try_get(1).errconv()?; + let channel: String = row.try_get(2).errconv()?; update_db_with_channel_datafiles(node_config, node_disk_ident, ks_prefix, rowid, &channel, dbc.clone()).await?; c1 += 1; if c1 % 40 == 0 { trace!("import datafiles {} {}", c1, channel); - dbc.query("commit", &[]).await?; - dbc.query("begin", &[]).await?; + dbc.query("commit", &[]).await.errconv()?; + dbc.query("begin", &[]).await.errconv()?; } if false && c1 >= 30 { break; } } - dbc.query("commit", &[]).await?; + dbc.query("commit", &[]).await.errconv()?; Ok(()) } @@ -709,7 +720,7 @@ impl ChannelDatafileDescSink for DatafileDbWriter { &((k.timebin() + 1) as i64 * k.binsize() as i64), &serde_json::to_value(k)?, ] - ).await?; + ).await.errconv()?; *c1.write()? += 1; Ok(()) }) diff --git a/dbconn/src/search.rs b/dbconn/src/search.rs index 7d5764b..798e3c4 100644 --- a/dbconn/src/search.rs +++ b/dbconn/src/search.rs @@ -1,4 +1,4 @@ -use crate::create_connection; +use crate::{create_connection, ErrConv}; use err::Error; use netpod::{ChannelArchiver, ChannelSearchQuery, ChannelSearchResult, ChannelSearchSingleResult, NodeConfigCached}; use serde_json::Value as JsVal; @@ -18,7 +18,8 @@ pub async fn search_channel_databuffer( sql.as_str(), &[&query.name_regex, &query.source_regex, &query.description_regex, &"asc"], ) - .await?; + .await + .errconv()?; let mut res = vec![]; for row in rows { let shapedb: Option = row.get(4); @@ -74,7 +75,7 @@ pub async fn search_channel_archeng( " limit 100" )); let cl = create_connection(&conf.database).await?; - let rows = cl.query(sql.as_str(), &[&query.name_regex]).await?; + let rows = cl.query(sql.as_str(), &[&query.name_regex]).await.errconv()?; let mut res = vec![]; for row in rows { let name: String = row.get(0); @@ -105,7 +106,10 @@ pub async fn search_channel_archeng( if k == "Scalar" { vec![] } else { - return Err(Error::with_msg_no_trace(format!("search_channel_archeng can not understand {:?}", config))); + return Err(Error::with_msg_no_trace(format!( + "search_channel_archeng can not understand {:?}", + config + ))); } } JsVal::Object(k) => match k.get("Wave") { @@ -114,15 +118,24 @@ pub async fn search_channel_archeng( vec![k.as_i64().unwrap_or(u32::MAX as i64) as u32] } _ => { - return Err(Error::with_msg_no_trace(format!("search_channel_archeng can not understand {:?}", config))); + return Err(Error::with_msg_no_trace(format!( + "search_channel_archeng can not understand {:?}", + config + ))); } }, None => { - return Err(Error::with_msg_no_trace(format!("search_channel_archeng can not understand {:?}", config))); + return Err(Error::with_msg_no_trace(format!( + "search_channel_archeng can not understand {:?}", + config + ))); } }, _ => { - return Err(Error::with_msg_no_trace(format!("search_channel_archeng can not understand {:?}", config))); + return Err(Error::with_msg_no_trace(format!( + "search_channel_archeng can not understand {:?}", + config + ))); } }, None => vec![], diff --git a/disk/src/binned/binnedfrompbv.rs b/disk/src/binned/binnedfrompbv.rs index 3c7644a..0ee4573 100644 --- a/disk/src/binned/binnedfrompbv.rs +++ b/disk/src/binned/binnedfrompbv.rs @@ -38,7 +38,7 @@ impl FetchedPreBinned { let mut url = Url::parse(&format!("http://{}:{}/api/4/prebinned", node.host, node.port))?; query.append_to_url(&mut url); let ret = Self { - uri: Uri::from_str(&url.to_string())?, + uri: Uri::from_str(&url.to_string()).map_err(Error::from_string)?, resfut: None, res: None, errored: false, @@ -115,7 +115,7 @@ where Err(e) => { error!("PreBinnedValueStream error in stream {:?}", e); self.errored = true; - Ready(Some(Err(e.into()))) + Ready(Some(Err(Error::from_string(e)))) } }, Pending => Pending, @@ -133,7 +133,7 @@ where } Err(e) => { self.errored = true; - Ready(Some(Err(e.into()))) + Ready(Some(Err(Error::from_string(e)))) } } }; diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 4b78d9e..357725b 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -209,7 +209,8 @@ where Ok::<_, Error>(enc.len()) } }) - .await??; + .await + .map_err(Error::from_string)??; tokio::fs::rename(&tmp_path, &path).await?; let ts2 = Instant::now(); let ret = WrittenPbCache { diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index 976ff24..80100ea 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -1,6 +1,6 @@ use super::paths; use bytes::BytesMut; -use err::Error; +use err::{ErrStr, Error}; use futures_util::StreamExt; use netpod::log::*; use netpod::{ChannelConfig, NanoRange, Nanos, Node}; @@ -251,7 +251,7 @@ async fn open_files_inner( "----- open_files_inner giving OpenedFileSet with {} files", h.files.len() ); - chtx.send(Ok(h)).await?; + chtx.send(Ok(h)).await.errstr()?; } Ok(()) } @@ -361,7 +361,7 @@ async fn open_expanded_files_inner( "----- open_expanded_files_inner giving OpenedFileSet with {} files", h.files.len() ); - chtx.send(Ok(h)).await?; + chtx.send(Ok(h)).await.errstr()?; if found_pre { p1 += 1; break; @@ -383,7 +383,7 @@ async fn open_expanded_files_inner( } } let h = OpenedFileSet { timebin: tb, files: a }; - chtx.send(Ok(h)).await?; + chtx.send(Ok(h)).await.errstr()?; p1 += 1; } } else { @@ -417,7 +417,7 @@ fn expanded_file_list() { array: false, compression: false, }; - let cluster = taskrun::test_cluster(); + let cluster = netpod::test_cluster(); let task = async move { let mut paths = vec![]; let mut files = open_expanded_files(&range, &channel_config, cluster.nodes[0].clone()); diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 3ebd019..b41a5e1 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -229,7 +229,7 @@ mod test { array: false, compression: false, }; - let cluster = taskrun::test_cluster(); + let cluster = netpod::test_cluster(); let node = cluster.nodes[nodeix].clone(); let buffer_size = 512; let event_chunker_conf = EventChunkerConf { diff --git a/dq/Cargo.toml b/dq/Cargo.toml new file mode 100644 index 0000000..4c1b202 --- /dev/null +++ b/dq/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "dq" +version = "0.1.0" +authors = ["Dominik Werder "] +edition = "2021" + +[lib] +path = "src/dq.rs" + +[dependencies] +#serde = { version = "1.0", features = ["derive"] } +#serde_json = "1.0" +#tokio = { version = "1.14.77", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +err = { path = "../err" } +#taskrun = { path = "../taskrun" } +clap = "3.0.0-beta.5" diff --git a/dq/src/bin/dq.rs b/dq/src/bin/dq.rs new file mode 100644 index 0000000..a010985 --- /dev/null +++ b/dq/src/bin/dq.rs @@ -0,0 +1,42 @@ +// TODO crate `err` pulls in all other dependencies in order to implement From<...> for Error. +// Refactor that... +// Crate `taskrun` also depends on `err`... + +use std::path::PathBuf; + +use err::Error; + +#[derive(Debug)] +pub struct Error2; + +use clap::{crate_version, Parser}; + +#[derive(Debug, Parser)] +#[clap(name="DAQ tools", author="Dominik Werder ", version=crate_version!())] +pub struct Opts { + #[clap(short, long, parse(from_occurrences))] + pub verbose: i32, + #[clap(subcommand)] + pub subcmd: SubCmd, +} + +#[derive(Debug, Parser)] +pub enum SubCmd { + ConvertArchiverApplianceChannel(ConvertArchiverApplianceChannel), +} + +#[derive(Debug, Parser)] +pub struct ConvertArchiverApplianceChannel { + name: String, + #[clap(about = "Look for archiver appliance data at given path")] + input_dir: PathBuf, + #[clap(about = "Generate Databuffer format at given path")] + output_dir: PathBuf, +} + +pub fn main() -> Result<(), Error> { + //taskrun::run(async { Ok(()) }) + let opts = Opts::parse(); + eprintln!("Opts: {:?}", opts); + Err(Error::with_msg_no_trace(format!("123"))) +} diff --git a/dq/src/dq.rs b/dq/src/dq.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/dq/src/dq.rs @@ -0,0 +1 @@ + diff --git a/err/Cargo.toml b/err/Cargo.toml index 6afa0f8..82e97ef 100644 --- a/err/Cargo.toml +++ b/err/Cargo.toml @@ -1,21 +1,21 @@ [package] name = "err" -version = "0.0.1-a.0" +version = "0.0.2" authors = ["Dominik Werder "] -edition = "2018" +edition = "2021" [dependencies] -hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp"] } -http = "0.2" -tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +#hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp"] } +#http = "0.2" backtrace = "0.3.56" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -bincode = "1.3.3" +serde_cbor = "0.11.1" async-channel = "1.6" chrono = { version = "0.4.19", features = ["serde"] } -nom = "6.1.2" -tokio-postgres = { version = "0.7", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } -serde_cbor = "0.11.1" -regex = "1.5.4" url = "2.2" +regex = "1.5.4" +bincode = "1.3.3" +#tokio = { version = "1.14.77", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +#tokio-postgres = { version = "0.7", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } +#nom = "6.1.2" diff --git a/err/src/lib.rs b/err/src/lib.rs index 86f0b9f..41e53a4 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -2,9 +2,6 @@ Error handling and reporting. */ -use http::header::InvalidHeaderValue; -use http::uri::InvalidUri; -use nom::error::ErrorKind; use serde::{Deserialize, Serialize}; use std::array::TryFromSliceError; use std::fmt; @@ -12,8 +9,6 @@ use std::net::AddrParseError; use std::num::{ParseFloatError, ParseIntError}; use std::string::FromUtf8Error; use std::sync::PoisonError; -use tokio::task::JoinError; -use tokio::time::error::Elapsed; #[derive(Clone, Serialize, Deserialize)] pub enum Reason { @@ -56,6 +51,13 @@ impl Error { } } + pub fn from_string(e: E) -> Self + where + E: ToString, + { + Self::with_msg(e.to_string()) + } + pub fn mark_bad_request(mut self) -> Self { self.reason = Some(Reason::BadRequest); self @@ -143,6 +145,38 @@ impl fmt::Display for Error { impl std::error::Error for Error {} +pub trait ErrConv { + fn err_conv(self) -> Result; +} + +impl ErrConv for Result +where + E: Into, +{ + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(e.into()), + } + } +} + +pub trait ErrStr { + fn errstr(self) -> Result; +} + +impl ErrStr for Result +where + E: ToString, +{ + fn errstr(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(e.to_string())), + } + } +} + impl From for Error { fn from(k: String) -> Self { Self::with_msg(k) @@ -167,18 +201,6 @@ impl From for Error { } } -impl From for Error { - fn from(k: http::Error) -> Self { - Self::with_msg(k.to_string()) - } -} - -impl From for Error { - fn from(k: hyper::Error) -> Self { - Self::with_msg(k.to_string()) - } -} - impl From for Error { fn from(k: serde_json::Error) -> Self { Self::with_msg(k.to_string()) @@ -215,6 +237,7 @@ impl From for Error { } } +/* impl From> for Error { fn from(k: nom::Err) -> Self { Self::with_msg(format!("nom::Err {:?}", k)) @@ -222,20 +245,23 @@ impl From> for Error { } impl nom::error::ParseError for Error { - fn from_error_kind(_input: I, kind: ErrorKind) -> Self { + fn from_error_kind(_input: I, kind: nom::error::ErrorKind) -> Self { Self::with_msg(format!("ParseError {:?}", kind)) } - fn append(_input: I, kind: ErrorKind, other: Self) -> Self { + fn append(_input: I, kind: nom::error::ErrorKind, other: Self) -> Self { Self::with_msg(format!("ParseError kind {:?} other {:?}", kind, other)) } } +*/ +/* impl From for Error { fn from(k: JoinError) -> Self { Self::with_msg(format!("JoinError {:?}", k)) } } +*/ impl From> for Error { fn from(k: Box) -> Self { @@ -243,24 +269,6 @@ impl From> for Error { } } -impl From for Error { - fn from(k: tokio_postgres::Error) -> Self { - Self::with_msg(k.to_string()) - } -} - -impl From> for Error { - fn from(k: async_channel::SendError) -> Self { - Self::with_msg(k.to_string()) - } -} - -impl From for Error { - fn from(k: InvalidUri) -> Self { - Self::with_msg(k.to_string()) - } -} - impl From for Error { fn from(k: serde_cbor::Error) -> Self { Self::with_msg(k.to_string()) @@ -285,18 +293,6 @@ impl From> for Error { } } -impl From for Error { - fn from(k: InvalidHeaderValue) -> Self { - Self::with_msg(format!("{:?}", k)) - } -} - -impl From for Error { - fn from(k: Elapsed) -> Self { - Self::with_msg(format!("{:?}", k)) - } -} - impl From for Error { fn from(k: url::ParseError) -> Self { Self::with_msg(format!("{:?}", k)) diff --git a/fsio/src/fsio.rs b/fsio/src/fsio.rs index 6331c73..07c34b2 100644 --- a/fsio/src/fsio.rs +++ b/fsio/src/fsio.rs @@ -99,7 +99,8 @@ async fn lock_1() -> Result<(), Error> { t2.join().map_err(|_| Error::with_msg_no_trace("join error"))?; Ok::<_, Error>(()) }) - .await??; + .await + .map_err(Error::from_string)??; Ok(()) } diff --git a/httpclient/src/lib.rs b/httpclient/src/lib.rs index 379767e..e365180 100644 --- a/httpclient/src/lib.rs +++ b/httpclient/src/lib.rs @@ -15,13 +15,16 @@ pub async fn get_channel_config( let req = hyper::Request::builder() .method(Method::GET) .uri(url.as_str()) - .body(Body::empty())?; + .body(Body::empty()) + .map_err(Error::from_string)?; let client = hyper::Client::new(); - let res = client.request(req).await?; + let res = client.request(req).await.map_err(Error::from_string)?; if !res.status().is_success() { return Err(Error::with_msg("http client error")); } - let buf = hyper::body::to_bytes(res.into_body()).await?; + let buf = hyper::body::to_bytes(res.into_body()) + .await + .map_err(Error::from_string)?; let ret: ChannelConfigResponse = serde_json::from_slice(&buf)?; Ok(ret) } diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 794ff86..8d27df8 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -1,10 +1,10 @@ +use crate::err::Error; use crate::gather::{gather_get_json_generic, SubRes}; use crate::{response, BodyStream}; use bytes::{BufMut, BytesMut}; use disk::eventchunker::{EventChunkerConf, EventFull}; -use err::Error; use futures_core::Stream; -use futures_util::{FutureExt, StreamExt}; +use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use http::{Method, StatusCode}; use hyper::{Body, Client, Request, Response}; use items::{RangeCompletableItem, Sitemty, StreamItem}; @@ -736,7 +736,7 @@ impl Stream for DataApiPython3DataStream { } }); //let _ = Box::new(s) as Box> + Unpin>; - self.chan_stream = Some(Box::pin(s)); + self.chan_stream = Some(Box::pin(s.map_err(Error::from))); continue; } Ready(Err(e)) => { @@ -754,10 +754,9 @@ impl Stream for DataApiPython3DataStream { } else { let channel = self.channels[self.chan_ix].clone(); self.chan_ix += 1; - self.config_fut = Some(Box::pin(read_local_config( - channel.clone(), - self.node_config.node.clone(), - ))); + self.config_fut = Some(Box::pin( + read_local_config(channel.clone(), self.node_config.node.clone()).map_err(Error::from), + )); continue; } } diff --git a/httpret/src/channelarchiver.rs b/httpret/src/channelarchiver.rs index 6c15268..dfb867c 100644 --- a/httpret/src/channelarchiver.rs +++ b/httpret/src/channelarchiver.rs @@ -1,7 +1,7 @@ +use crate::err::Error; use crate::response; -use err::Error; use futures_core::Stream; -use futures_util::StreamExt; +use futures_util::{StreamExt, TryStreamExt}; use http::{header, Method, Request, Response, StatusCode}; use hyper::Body; use netpod::log::*; @@ -112,6 +112,7 @@ impl ScanIndexFiles { "this node is not configured as channel archiver", ))?; let s = archapp_wrap::archapp::archeng::indexfiles::scan_index_files(conf.clone()); + let s = s.map_err(Error::from); let s = json_lines_stream(s); Ok(response(StatusCode::OK) .header(header::CONTENT_TYPE, APP_JSON_LINES) @@ -151,6 +152,7 @@ impl ScanChannels { "this node is not configured as channel archiver", ))?; let s = archapp_wrap::archapp::archeng::indexfiles::scan_channels(conf.clone()); + let s = s.map_err(Error::from); let s = json_lines_stream(s); Ok(response(StatusCode::OK) .header(header::CONTENT_TYPE, APP_JSON_LINES) @@ -191,6 +193,7 @@ impl ChannelNames { ))?; use archapp_wrap::archapp::archeng; let stream = archeng::configs::ChannelNameStream::new(conf.database.clone()); + let stream = stream.map_err(Error::from); let stream = json_lines_stream(stream); Ok(response(StatusCode::OK) .header(header::CONTENT_TYPE, APP_JSON_LINES) @@ -232,6 +235,7 @@ impl ScanConfigs { use archapp_wrap::archapp::archeng; let stream = archeng::configs::ChannelNameStream::new(conf.database.clone()); let stream = archeng::configs::ConfigStream::new(stream, conf.clone()); + let stream = stream.map_err(Error::from); let stream = json_lines_stream(stream); Ok(response(StatusCode::OK) .header(header::CONTENT_TYPE, APP_JSON_LINES) @@ -294,6 +298,7 @@ impl BlockRefStream { } Err(e) => Err(e), }); + let s = s.map_err(Error::from); let s = json_lines_stream(s); let s = s.map(|item| match item { Ok(k) => Ok(k), @@ -365,6 +370,7 @@ impl BlockStream { } Err(e) => Err(e), }); + let s = s.map_err(Error::from); let s = json_lines_stream(s); let s = s.map(|item| match item { Ok(k) => Ok(k), diff --git a/httpret/src/err.rs b/httpret/src/err.rs new file mode 100644 index 0000000..1239b10 --- /dev/null +++ b/httpret/src/err.rs @@ -0,0 +1,72 @@ +use std::fmt; + +#[derive(Debug)] +pub struct Error(::err::Error); + +impl Error { + pub fn with_msg>(s: S) -> Self { + Self(::err::Error::with_msg(s)) + } + + pub fn with_msg_no_trace>(s: S) -> Self { + Self(::err::Error::with_msg_no_trace(s)) + } + + pub fn msg(&self) -> &str { + self.0.msg() + } + + pub fn reason(&self) -> Option<::err::Reason> { + self.0.reason() + } + + pub fn public_msg(&self) -> Option<&Vec> { + self.0.public_msg() + } +} + +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(self, fmt) + } +} + +impl std::error::Error for Error {} + +impl From<::err::Error> for Error { + fn from(x: ::err::Error) -> Self { + Self(x) + } +} + +impl From for ::err::Error { + fn from(x: Error) -> Self { + x.0 + } +} + +pub trait Convable {} + +impl From for Error +where + T: ToString, +{ + fn from(x: T) -> Self { + Self(::err::Error::from_string(x)) + } +} + +impl Convable for std::net::AddrParseError {} +impl Convable for std::string::FromUtf8Error {} +impl Convable for fmt::Error {} +impl Convable for std::io::Error {} +impl Convable for std::num::ParseIntError {} +impl Convable for dbconn::pg::Error {} +impl Convable for tokio::task::JoinError {} +impl Convable for tokio::time::error::Elapsed {} +impl Convable for serde_json::Error {} +impl Convable for chrono::ParseError {} +impl Convable for url::ParseError {} +impl Convable for http::uri::InvalidUri {} +impl Convable for http::Error {} +impl Convable for hyper::Error {} diff --git a/httpret/src/gather.rs b/httpret/src/gather.rs index 765a02d..09e927f 100644 --- a/httpret/src/gather.rs +++ b/httpret/src/gather.rs @@ -1,5 +1,5 @@ +use crate::err::Error; use crate::response; -use err::Error; use futures_util::{select, FutureExt}; use http::{Method, StatusCode}; use hyper::{Body, Client, Request, Response}; diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 540da17..052fd82 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -1,19 +1,20 @@ pub mod api1; pub mod channelarchiver; +pub mod err; pub mod gather; pub mod proxy; pub mod pulsemap; pub mod search; +use crate::err::Error; use crate::gather::gather_get_json; use crate::pulsemap::UpdateTask; use bytes::Bytes; use disk::binned::query::PreBinnedQuery; use disk::events::{PlainEventsBinaryQuery, PlainEventsJsonQuery}; -use err::Error; use future::Future; use futures_core::Stream; -use futures_util::{FutureExt, StreamExt}; +use futures_util::{FutureExt, StreamExt, TryStreamExt}; use http::{HeaderMap, Method, StatusCode}; use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; @@ -89,7 +90,7 @@ where { type Output = ::Output; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { let h = std::panic::catch_unwind(AssertUnwindSafe(|| self.f.poll_unpin(cx))); match h { Ok(k) => k, @@ -101,7 +102,7 @@ where } None => {} } - Poll::Ready(Err(Error::from(format!("{:?}", e)))) + Poll::Ready(Err(Error::with_msg(format!("{:?}", e)))) } } } @@ -369,7 +370,7 @@ struct BodyStreamWrap(netpod::BodyStream); impl hyper::body::HttpBody for BodyStreamWrap { type Data = bytes::Bytes; - type Error = Error; + type Error = ::err::Error; fn poll_data(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { self.0.inner.poll_next_unpin(cx) @@ -417,7 +418,7 @@ where Ready(Some(Ok(k))) => Ready(Some(Ok(k))), Ready(Some(Err(e))) => { error!("body stream error: {:?}", e); - Ready(Some(Err(e.into()))) + Ready(Some(Err(Error::from(e)))) } Ready(None) => Ready(None), Pending => Pending, @@ -439,8 +440,23 @@ trait ToPublicResponse { impl ToPublicResponse for Error { fn to_public_response(&self) -> Response { let status = match self.reason() { - Some(err::Reason::BadRequest) => StatusCode::BAD_REQUEST, - Some(err::Reason::InternalError) => StatusCode::INTERNAL_SERVER_ERROR, + Some(::err::Reason::BadRequest) => StatusCode::BAD_REQUEST, + Some(::err::Reason::InternalError) => StatusCode::INTERNAL_SERVER_ERROR, + _ => StatusCode::INTERNAL_SERVER_ERROR, + }; + let msg = match self.public_msg() { + Some(v) => v.join("\n"), + _ => String::new(), + }; + response(status).body(Body::from(msg)).unwrap() + } +} + +impl ToPublicResponse for ::err::Error { + fn to_public_response(&self) -> Response { + let status = match self.reason() { + Some(::err::Reason::BadRequest) => StatusCode::BAD_REQUEST, + Some(::err::Reason::InternalError) => StatusCode::INTERNAL_SERVER_ERROR, _ => StatusCode::INTERNAL_SERVER_ERROR, }; let msg = match self.public_msg() { @@ -476,7 +492,9 @@ async fn binned_inner(req: Request, node_config: &NodeConfigCached) -> Res async fn binned_binary(query: BinnedQuery, node_config: &NodeConfigCached) -> Result, Error> { let ret = match disk::binned::binned_bytes_for_http(&query, node_config).await { - Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("binned_binary")))?, + Ok(s) => { + response(StatusCode::OK).body(BodyStream::wrapped(s.map_err(Error::from), format!("binned_binary")))? + } Err(e) => { if query.report_error() { response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))? @@ -491,7 +509,7 @@ async fn binned_binary(query: BinnedQuery, node_config: &NodeConfigCached) -> Re async fn binned_json(query: BinnedQuery, node_config: &NodeConfigCached) -> Result, Error> { let ret = match disk::binned::binned_json(&query, node_config).await { - Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("binned_json")))?, + Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s.map_err(Error::from), format!("binned_json")))?, Err(e) => { if query.report_error() { response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))? @@ -525,7 +543,7 @@ async fn prebinned_inner(req: Request, node_config: &NodeConfigCached) -> }); let fut = disk::binned::prebinned::pre_binned_bytes_for_http(node_config, &query).instrument(span1); let ret = match fut.await { - Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, desc))?, + Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s.map_err(Error::from), desc))?, Err(e) => { if query.report_error() { response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))? @@ -574,7 +592,10 @@ async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) ); let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?; let s = s.map(|item| item.make_frame()); - let ret = response(StatusCode::OK).body(BodyStream::wrapped(s, format!("plain_events_binary")))?; + let ret = response(StatusCode::OK).body(BodyStream::wrapped( + s.map_err(Error::from), + format!("plain_events_binary"), + ))?; Ok(ret) } @@ -591,7 +612,10 @@ async fn plain_events_json(req: Request, node_config: &NodeConfigCached) - query.do_log(), ); let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?; - let ret = response(StatusCode::OK).body(BodyStream::wrapped(s, format!("plain_events_json")))?; + let ret = response(StatusCode::OK).body(BodyStream::wrapped( + s.map_err(Error::from), + format!("plain_events_json"), + ))?; Ok(ret) } diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index e77ee82..0ba67bb 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -1,10 +1,10 @@ pub mod api4; use crate::api1::{channel_search_configs_v1, channel_search_list_v1, gather_json_2_v1, proxy_distribute_v1}; +use crate::err::Error; use crate::gather::{gather_get_json_generic, SubRes}; use crate::{api_1_docs, api_4_docs, response, Cont}; use disk::events::PlainEventsJsonQuery; -use err::Error; use futures_core::Stream; use futures_util::pin_mut; use http::{Method, StatusCode}; diff --git a/httpret/src/proxy/api4.rs b/httpret/src/proxy/api4.rs index b396001..89251ba 100644 --- a/httpret/src/proxy/api4.rs +++ b/httpret/src/proxy/api4.rs @@ -1,6 +1,6 @@ +use crate::err::Error; use crate::gather::{gather_get_json_generic, SubRes}; use crate::response; -use err::Error; use futures_core::Future; use http::{header, Request, Response, StatusCode}; use hyper::Body; @@ -68,7 +68,8 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R let res = ChannelSearchResult { channels: res }; let res = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, APP_JSON) - .body(Body::from(serde_json::to_string(&res)?))?; + .body(Body::from(serde_json::to_string(&res)?)) + .map_err(Error::from)?; Ok(res) }; let ret = gather_get_json_generic( @@ -83,6 +84,8 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R .await?; Ok(ret) } else { - Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::from(format!("{:?}", proxy_config.name)))?) + Ok(response(StatusCode::NOT_ACCEPTABLE) + .body(Body::from(format!("{:?}", proxy_config.name))) + .map_err(Error::from)?) } } diff --git a/httpret/src/pulsemap.rs b/httpret/src/pulsemap.rs index a46eca1..62344a1 100644 --- a/httpret/src/pulsemap.rs +++ b/httpret/src/pulsemap.rs @@ -1,11 +1,9 @@ +use crate::err::Error; use crate::response; -use bytes::Buf; -use bytes::BytesMut; -use err::Error; +use bytes::{Buf, BytesMut}; use futures_util::stream::FuturesOrdered; use futures_util::FutureExt; -use http::Uri; -use http::{Method, StatusCode}; +use http::{Method, StatusCode, Uri}; use hyper::{Body, Request, Response}; use netpod::log::*; use netpod::NodeConfigCached; @@ -17,14 +15,11 @@ use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; -use std::time::Duration; -use std::time::Instant; +use std::time::{Duration, Instant}; use std::{io::SeekFrom, path::PathBuf}; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tokio::task::JoinHandle; -use tokio::{ - fs::File, - io::{AsyncReadExt, AsyncSeekExt}, -}; pub struct MapPulseHisto { _pulse: u64, diff --git a/httpret/src/search.rs b/httpret/src/search.rs index f719b72..5392dc8 100644 --- a/httpret/src/search.rs +++ b/httpret/src/search.rs @@ -1,5 +1,5 @@ +use crate::err::Error; use crate::response; -use err::Error; use http::header; use hyper::{Body, Request, Response, StatusCode}; use netpod::log::*; diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 66c5bc4..44c073d 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -1,6 +1,6 @@ use async_channel::{bounded, Receiver}; use bytes::{BufMut, BytesMut}; -use err::Error; +use err::{ErrStr, Error}; use futures_util::FutureExt; use netpod::NodeConfigCached; use serde::{Deserialize, Serialize}; @@ -31,7 +31,7 @@ pub async fn ca_connect_1( async move { let mut conn = tokio::net::TcpStream::connect("S30CB06-CVME-LLRF2.psi.ch:5064").await?; let (mut inp, mut out) = conn.split(); - tx.send(Ok(FetchItem::Log(format!("connected")))).await?; + tx.send(Ok(FetchItem::Log(format!("connected")))).await.errstr()?; let mut buf = [0; 64]; let mut b2 = BytesMut::with_capacity(128); @@ -42,10 +42,11 @@ pub async fn ca_connect_1( b2.put_u32(0); b2.put_u32(0); out.write_all(&b2).await?; - tx.send(Ok(FetchItem::Log(format!("written")))).await?; + tx.send(Ok(FetchItem::Log(format!("written")))).await.errstr()?; let n1 = inp.read(&mut buf).await?; tx.send(Ok(FetchItem::Log(format!("received: {} {:?}", n1, buf)))) - .await?; + .await + .errstr()?; // Search to get cid: let chn = b"SATCB01-DBPM220:Y2"; @@ -58,10 +59,11 @@ pub async fn ca_connect_1( b2.put_u32(0x71803472); b2.put_slice(chn); out.write_all(&b2).await?; - tx.send(Ok(FetchItem::Log(format!("written")))).await?; + tx.send(Ok(FetchItem::Log(format!("written")))).await.errstr()?; let n1 = inp.read(&mut buf).await?; tx.send(Ok(FetchItem::Log(format!("received: {} {:?}", n1, buf)))) - .await?; + .await + .errstr()?; Ok::<_, Error>(()) } @@ -70,7 +72,9 @@ pub async fn ca_connect_1( match item { Ok(_) => {} Err(e) => { - tx2.send(Ok(FetchItem::Log(format!("Seeing error: {:?}", e)))).await?; + tx2.send(Ok(FetchItem::Log(format!("Seeing error: {:?}", e)))) + .await + .errstr()?; } } Ok::<_, Error>(()) diff --git a/netfetch/src/test.rs b/netfetch/src/test.rs index dd8f62b..3ff807a 100644 --- a/netfetch/src/test.rs +++ b/netfetch/src/test.rs @@ -49,7 +49,9 @@ fn ca_connect_1() { Ok::<_, Error>(()) }; let fut = async move { - let ret = tokio::time::timeout(Duration::from_millis(4000), fut).await??; + let ret = tokio::time::timeout(Duration::from_millis(4000), fut) + .await + .map_err(Error::from_string)??; Ok(ret) }; taskrun::run(fut).unwrap(); diff --git a/netpod/src/api1.rs b/netpod/src/api1.rs index 931064f..d1787b3 100644 --- a/netpod/src/api1.rs +++ b/netpod/src/api1.rs @@ -1,6 +1,6 @@ -use serde::Deserialize; +use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct Range { #[serde(rename = "type")] ty: String, @@ -12,10 +12,10 @@ pub struct Range { // TODO implement Deserialize such that I recognize the different possible formats... // I guess, when serializing, it's ok to use the fully qualified format throughout. -#[derive(Debug, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct ChannelList {} -#[derive(Debug, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct Query { range: Range, channels: ChannelList, diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 04f0e55..52410ac 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1446,3 +1446,73 @@ pub fn f64_close(a: f64, b: f64) -> bool { false } } + +pub fn test_cluster() -> Cluster { + let nodes = (0..3) + .into_iter() + .map(|id| Node { + host: "localhost".into(), + listen: "0.0.0.0".into(), + port: 6170 + id as u16, + port_raw: 6170 + id as u16 + 100, + data_base_path: format!("../tmpdata/node{:02}", id).into(), + cache_base_path: format!("../tmpdata/node{:02}", id).into(), + ksprefix: "ks".into(), + backend: "testbackend".into(), + splits: None, + archiver_appliance: None, + channel_archiver: None, + }) + .collect(); + Cluster { + nodes, + database: Database { + host: "localhost".into(), + name: "testingdaq".into(), + user: "testingdaq".into(), + pass: "testingdaq".into(), + }, + run_map_pulse_task: false, + is_central_storage: false, + file_io_buffer_size: Default::default(), + } +} + +pub fn sls_test_cluster() -> Cluster { + let nodes = (0..1) + .into_iter() + .map(|id| Node { + host: "localhost".into(), + listen: "0.0.0.0".into(), + port: 6190 + id as u16, + port_raw: 6190 + id as u16 + 100, + data_base_path: format!("NOdatapath{}", id).into(), + cache_base_path: format!("../tmpdata/node{:02}", id).into(), + ksprefix: "NOKS".into(), + backend: "sls-archive".into(), + splits: None, + archiver_appliance: None, + channel_archiver: Some(ChannelArchiver { + data_base_paths: vec![PathBuf::from("/data/daqbuffer-testdata/sls/gfa03")], + database: Database { + host: "localhost".into(), + name: "testingdaq".into(), + user: "testingdaq".into(), + pass: "testingdaq".into(), + }, + }), + }) + .collect(); + Cluster { + nodes, + database: Database { + host: "localhost".into(), + name: "testingdaq".into(), + user: "testingdaq".into(), + pass: "testingdaq".into(), + }, + run_map_pulse_task: false, + is_central_storage: false, + file_io_buffer_size: Default::default(), + } +} diff --git a/parse/src/channelconfig.rs b/parse/src/channelconfig.rs index 6c349d4..a9d7605 100644 --- a/parse/src/channelconfig.rs +++ b/parse/src/channelconfig.rs @@ -12,15 +12,49 @@ use nom::Needed; use num_derive::{FromPrimitive, ToPrimitive}; use num_traits::ToPrimitive; use serde::{Deserialize, Serialize}; +use std::fmt; use tokio::io::ErrorKind; -type NRes<'a, O> = nom::IResult<&'a [u8], O, err::Error>; +#[derive(Debug)] +pub struct NErr { + msg: String, +} + +impl From> for NErr { + fn from(k: nom::Err) -> Self { + Self { + msg: format!("nom::Err {:?}", k), + } + } +} + +impl nom::error::ParseError for NErr { + fn from_error_kind(_input: I, kind: nom::error::ErrorKind) -> Self { + Self { + msg: format!("ParseError {:?}", kind), + } + } + + fn append(_input: I, kind: nom::error::ErrorKind, other: Self) -> Self { + Self { + msg: format!("ParseError kind {:?} other {:?}", kind, other), + } + } +} + +impl From for Error { + fn from(x: NErr) -> Self { + Self::with_msg_no_trace(x.msg) + } +} + +type NRes<'a, O> = nom::IResult<&'a [u8], O, NErr>; fn mkerr<'a, S, O>(msg: S) -> NRes<'a, O> where S: Into, { - let e = Error::with_msg(msg); + let e = NErr { msg: msg.into() }; Err(nom::Err::Error(e)) } @@ -291,7 +325,7 @@ pub async fn read_local_config(channel: Channel, node: Node) -> Result return Err(e.into()), }, }; - let config = parse_config(&buf)?; + let config = parse_config(&buf).map_err(NErr::from)?; Ok(config.1) } diff --git a/taskrun/Cargo.toml b/taskrun/Cargo.toml index f6ecc98..2b9bd17 100644 --- a/taskrun/Cargo.toml +++ b/taskrun/Cargo.toml @@ -8,11 +8,11 @@ edition = "2021" path = "src/taskrun.rs" [dependencies] -tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.14.77", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } tracing = "0.1.25" tracing-subscriber = { version = "0.2.17", features= ["chrono"] } backtrace = "0.3.56" lazy_static = "1.4.0" chrono = "0.4" err = { path = "../err" } -netpod = { path = "../netpod" } +#netpod = { path = "../netpod" } diff --git a/taskrun/src/taskrun.rs b/taskrun/src/taskrun.rs index b34b454..6c5316a 100644 --- a/taskrun/src/taskrun.rs +++ b/taskrun/src/taskrun.rs @@ -2,10 +2,8 @@ pub mod append; use crate::log::*; use err::Error; -use netpod::ChannelArchiver; use std::future::Future; use std::panic; -use std::path::PathBuf; use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime; use tokio::task::JoinHandle; @@ -125,73 +123,3 @@ where { tokio::spawn(task) } - -pub fn test_cluster() -> netpod::Cluster { - let nodes = (0..3) - .into_iter() - .map(|id| netpod::Node { - host: "localhost".into(), - listen: "0.0.0.0".into(), - port: 6170 + id as u16, - port_raw: 6170 + id as u16 + 100, - data_base_path: format!("../tmpdata/node{:02}", id).into(), - cache_base_path: format!("../tmpdata/node{:02}", id).into(), - ksprefix: "ks".into(), - backend: "testbackend".into(), - splits: None, - archiver_appliance: None, - channel_archiver: None, - }) - .collect(); - netpod::Cluster { - nodes, - database: netpod::Database { - host: "localhost".into(), - name: "testingdaq".into(), - user: "testingdaq".into(), - pass: "testingdaq".into(), - }, - run_map_pulse_task: false, - is_central_storage: false, - file_io_buffer_size: Default::default(), - } -} - -pub fn sls_test_cluster() -> netpod::Cluster { - let nodes = (0..1) - .into_iter() - .map(|id| netpod::Node { - host: "localhost".into(), - listen: "0.0.0.0".into(), - port: 6190 + id as u16, - port_raw: 6190 + id as u16 + 100, - data_base_path: format!("NOdatapath{}", id).into(), - cache_base_path: format!("../tmpdata/node{:02}", id).into(), - ksprefix: "NOKS".into(), - backend: "sls-archive".into(), - splits: None, - archiver_appliance: None, - channel_archiver: Some(ChannelArchiver { - data_base_paths: vec![PathBuf::from("/data/daqbuffer-testdata/sls/gfa03")], - database: netpod::Database { - host: "localhost".into(), - name: "testingdaq".into(), - user: "testingdaq".into(), - pass: "testingdaq".into(), - }, - }), - }) - .collect(); - netpod::Cluster { - nodes, - database: netpod::Database { - host: "localhost".into(), - name: "testingdaq".into(), - user: "testingdaq".into(), - pass: "testingdaq".into(), - }, - run_map_pulse_task: false, - is_central_storage: false, - file_io_buffer_size: Default::default(), - } -}