From 66538ced79032a73de25343267f331b5372d2d2b Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 4 Nov 2024 11:05:01 +0100 Subject: [PATCH] Generic error for body stream --- crates/httpret/src/api4/databuffer_tools.rs | 1 - crates/items_2/Cargo.toml | 9 +++++++-- crates/items_2/src/items_2.rs | 10 +--------- crates/items_2/src/test.rs | 10 ++++++++++ crates/netpod/Cargo.toml | 5 ++++- crates/netpod/src/netpod.rs | 8 +++++--- 6 files changed, 27 insertions(+), 16 deletions(-) diff --git a/crates/httpret/src/api4/databuffer_tools.rs b/crates/httpret/src/api4/databuffer_tools.rs index b4eb31e..83b6641 100644 --- a/crates/httpret/src/api4/databuffer_tools.rs +++ b/crates/httpret/src/api4/databuffer_tools.rs @@ -91,7 +91,6 @@ impl FindActiveHandler { .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); let _url = req_uri_to_url(req.uri()).map_err(|_| FindActiveError::HttpBadUrl)?; if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { - type _A = netpod::BodyStream; let stream = FindActiveStream::new(40, 2, ncc); let stream = stream.chain(FindActiveStream::new(40, 3, ncc)); let stream = stream.chain(FindActiveStream::new(40, 4, ncc)); diff --git a/crates/items_2/Cargo.toml b/crates/items_2/Cargo.toml index c801684..7a0bc14 100644 --- a/crates/items_2/Cargo.toml +++ b/crates/items_2/Cargo.toml @@ -11,7 +11,6 @@ doctest = false [dependencies] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -serde_cbor = "0.11.2" ciborium = "0.2.1" rmp-serde = "1.1.1" postcard = { version = "1.0.0", features = ["use-std"] } @@ -22,8 +21,8 @@ num-traits = "0.2.15" chrono = { version = "0.4.19", features = ["serde"] } crc32fast = "1.3.2" futures-util = "0.3.24" -tokio = { version = "1.20", features = ["rt-multi-thread", "sync", "time"] } humantime-serde = "1.1.1" +thiserror = "1" err = { path = "../err" } items_0 = { path = "../items_0" } items_proc = { path = "../items_proc" } @@ -31,3 +30,9 @@ netpod = { path = "../netpod" } taskrun = { path = "../taskrun" } parse = { path = "../parse" } bitshuffle = { path = "../bitshuffle" } + +[patch.crates-io] +thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" } + +[features] +heavy = [] diff --git a/crates/items_2/src/items_2.rs b/crates/items_2/src/items_2.rs index 00030c8..b407c79 100644 --- a/crates/items_2/src/items_2.rs +++ b/crates/items_2/src/items_2.rs @@ -14,6 +14,7 @@ pub mod frame; pub mod inmem; pub mod merger; pub mod streams; +#[cfg(feature = "heavy")] #[cfg(test)] pub mod test; pub mod testgen; @@ -174,12 +175,3 @@ impl Mergeable for Box { pub trait ChannelEventsInput: Stream> + EventTransform + Send {} impl ChannelEventsInput for T where T: Stream> + EventTransform + Send {} - -pub fn runfut(fut: F) -> Result -where - F: std::future::Future>, -{ - use futures_util::TryFutureExt; - let fut = fut.map_err(|e| e.into()); - taskrun::run(fut) -} diff --git a/crates/items_2/src/test.rs b/crates/items_2/src/test.rs index c60d5cb..15123b0 100644 --- a/crates/items_2/src/test.rs +++ b/crates/items_2/src/test.rs @@ -31,6 +31,16 @@ use netpod::BinnedRangeEnum; use std::time::Duration; use std::time::Instant; +#[cfg(test)] +pub fn runfut(fut: F) -> Result +where + F: std::future::Future>, +{ + use futures_util::TryFutureExt; + let fut = fut.map_err(|e| e.into()); + taskrun::run(fut) +} + #[test] fn items_move_events() { let evs = make_some_boxed_d0_f32(10, SEC, SEC, 0, 1846713782); diff --git a/crates/netpod/Cargo.toml b/crates/netpod/Cargo.toml index 41117e1..caf8745 100644 --- a/crates/netpod/Cargo.toml +++ b/crates/netpod/Cargo.toml @@ -13,7 +13,6 @@ serde_json = "1.0" http = "1.0.0" humantime = "2.1.0" humantime-serde = "1.1.1" -async-channel = "1.8.0" bytes = "1.4.0" chrono = { version = "0.4.19", features = ["serde"] } futures-util = "0.3.14" @@ -22,4 +21,8 @@ url = "2.5.0" num-traits = "0.2.16" hex = "0.4.3" rand = "0.8.5" +thiserror = "1" err = { path = "../err" } + +[patch.crates-io] +thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" } diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index ec7b208..5afebb1 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -188,9 +188,11 @@ pub enum AsyncChannelError { Recv, } -pub struct BodyStream { - //pub receiver: async_channel::Receiver>, - pub inner: Box> + Send + Unpin>, +pub struct BodyStream +where + E: std::error::Error, +{ + pub inner: Box> + Send + Unpin>, } #[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Serialize, Deserialize)]