From 35d15c46946cf9e2d91820bc2ed799f19438c621 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 4 Nov 2024 16:35:26 +0100 Subject: [PATCH] WIP --- crates/items_0/src/streamitem.rs | 19 ++++- crates/items_0/src/timebin.rs | 9 --- crates/items_2/Cargo.toml | 2 +- crates/netpod/Cargo.toml | 2 +- crates/netpod/src/netpod.rs | 17 ++--- crates/netpod/src/range/evrange.rs | 4 +- crates/query/Cargo.toml | 2 +- crates/query/src/api4/binned.rs | 2 + crates/streams/Cargo.toml | 7 +- crates/streams/src/cbor_stream.rs | 71 +++++++++---------- crates/streams/src/collect.rs | 40 ++++++----- crates/streams/src/eventsplainreader.rs | 13 +++- crates/streams/src/framed_bytes.rs | 4 +- crates/streams/src/frames/eventsfromframes.rs | 8 ++- crates/streams/src/frames/inmem.rs | 44 +++++++----- crates/streams/src/generators.rs | 17 ++++- crates/streams/src/itemclone.rs | 5 +- crates/streams/src/json_stream.rs | 5 +- crates/streams/src/needminbuffer.rs | 11 ++- crates/streams/src/plaineventscbor.rs | 5 +- crates/streams/src/plaineventsjson.rs | 11 +-- crates/streams/src/plaineventsstream.rs | 19 +++-- crates/streams/src/test/collect.rs | 51 +++++++------ crates/streams/src/test/events.rs | 8 ++- crates/streams/src/timebin.rs | 2 +- crates/streams/src/timebin/basic.rs | 30 ++++---- crates/streams/src/timebin/cached/reader.rs | 20 +++--- crates/streams/src/timebin/fromevents.rs | 4 +- crates/streams/src/timebin/fromlayers.rs | 4 +- crates/streams/src/timebin/gapfill.rs | 35 +++++---- crates/streams/src/timebinnedjson.rs | 12 +++- crates/streams/src/transform.rs | 5 +- 32 files changed, 268 insertions(+), 220 deletions(-) diff --git a/crates/items_0/src/streamitem.rs b/crates/items_0/src/streamitem.rs index f23bebc..ef18714 100644 --- a/crates/items_0/src/streamitem.rs +++ b/crates/items_0/src/streamitem.rs @@ -1,4 +1,3 @@ -use err::Error; use netpod::log::Level; use netpod::DiskStats; use netpod::EventDataReadStats; @@ -69,7 +68,9 @@ impl LogItem { } } -pub type Sitemty = Result>, Error>; +pub type SitemErrTy = err::Error; + +pub type Sitemty = Result>, SitemErrTy>; pub type Sitemty2 = Result>, E>; @@ -144,6 +145,20 @@ pub fn sitem_data(x: X) -> Sitemty { Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) } +pub fn sitem_err_from_string(x: T) -> Sitemty +where + T: ToString, +{ + Err(err::Error::from_string(x)) +} + +pub fn sitem_err2_from_string(x: T) -> err::Error +where + T: ToString, +{ + err::Error::from_string(x) +} + mod levelserde { use super::Level; use serde::de::{self, Visitor}; diff --git a/crates/items_0/src/timebin.rs b/crates/items_0/src/timebin.rs index 0e846fc..d93ddd7 100644 --- a/crates/items_0/src/timebin.rs +++ b/crates/items_0/src/timebin.rs @@ -1,18 +1,9 @@ use crate::collect_s::CollectableDyn; -use crate::collect_s::CollectorDyn; -use crate::collect_s::ToJsonResult; use crate::AsAnyMut; -use crate::AsAnyRef; -use crate::Events; -use crate::Resettable; -use crate::TypeName; use crate::WithLen; -use err::Error; -use netpod::log::*; use netpod::BinnedRange; use netpod::BinnedRangeEnum; use netpod::TsNano; -use std::any::Any; use std::fmt; use std::ops::Range; diff --git a/crates/items_2/Cargo.toml b/crates/items_2/Cargo.toml index 7a0bc14..7d8dd1d 100644 --- a/crates/items_2/Cargo.toml +++ b/crates/items_2/Cargo.toml @@ -22,7 +22,7 @@ chrono = { version = "0.4.19", features = ["serde"] } crc32fast = "1.3.2" futures-util = "0.3.24" humantime-serde = "1.1.1" -thiserror = "1" +thiserror = "0.0.1" err = { path = "../err" } items_0 = { path = "../items_0" } items_proc = { path = "../items_proc" } diff --git a/crates/netpod/Cargo.toml b/crates/netpod/Cargo.toml index caf8745..e15698c 100644 --- a/crates/netpod/Cargo.toml +++ b/crates/netpod/Cargo.toml @@ -21,7 +21,7 @@ url = "2.5.0" num-traits = "0.2.16" hex = "0.4.3" rand = "0.8.5" -thiserror = "1" +thiserror = "0.0.1" err = { path = "../err" } [patch.crates-io] diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 5399196..df61d30 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -205,6 +205,10 @@ pub enum NetpodError { MissingBinningScheme, BadCacheUsage(String), TimelikeBinWidthImpossibleForPulseRange, + BinCountTooLarge, + BinCountTooSmall, + BinnedNoGridMatch, + NotTimerange, } #[derive(Debug, ThisError)] @@ -2572,20 +2576,17 @@ pub enum BinnedRangeEnum { } impl BinnedRangeEnum { - fn covering_range_ty(a: T, b: T, min_bin_count: u32) -> Result + fn covering_range_ty(a: T, b: T, min_bin_count: u32) -> Result where T: Dim0Index + 'static, { let opts = T::binned_bin_len_opts(); if min_bin_count < 1 { - Err(Error::with_msg("min_bin_count < 1"))?; + Err(NetpodError::BinCountTooSmall)?; } let bin_count_max = i32::MAX as u32; if min_bin_count > bin_count_max { - Err(Error::with_msg(format!( - "min_bin_count > {}: {}", - bin_count_max, min_bin_count - )))?; + Err(NetpodError::BinCountTooLarge)?; } let du = b.sub(&a); let max_bin_len = du.div_n(min_bin_count as u64); @@ -2599,7 +2600,7 @@ impl BinnedRangeEnum { return Ok(ret); } } - Err(Error::with_msg_no_trace("can not find matching binned grid")) + Err(NetpodError::BinnedNoGridMatch) } /// Cover at least the given range while selecting the bin width which best fits the requested bin width. @@ -2611,7 +2612,7 @@ impl BinnedRangeEnum { } /// Cover at least the given range with at least as many as the requested number of bins. - pub fn covering_range(range: SeriesRange, min_bin_count: u32) -> Result { + pub fn covering_range(range: SeriesRange, min_bin_count: u32) -> Result { match range { SeriesRange::TimeRange(k) => Self::covering_range_ty(TsNano(k.beg), TsNano(k.end), min_bin_count), SeriesRange::PulseRange(k) => Self::covering_range_ty(PulseId(k.beg), PulseId(k.end), min_bin_count), diff --git a/crates/netpod/src/range/evrange.rs b/crates/netpod/src/range/evrange.rs index cfccb64..3d88467 100644 --- a/crates/netpod/src/range/evrange.rs +++ b/crates/netpod/src/range/evrange.rs @@ -102,12 +102,12 @@ impl From<(u64, u64)> for NanoRange { } impl TryFrom<&SeriesRange> for NanoRange { - type Error = Error; + type Error = NetpodError; fn try_from(val: &SeriesRange) -> Result { match val { SeriesRange::TimeRange(x) => Ok(x.clone()), - SeriesRange::PulseRange(_) => Err(Error::with_public_msg_no_trace("given SeriesRange is not a time range")), + SeriesRange::PulseRange(_) => Err(NetpodError::NotTimerange), } } } diff --git a/crates/query/Cargo.toml b/crates/query/Cargo.toml index 4268f92..6534b4e 100644 --- a/crates/query/Cargo.toml +++ b/crates/query/Cargo.toml @@ -12,7 +12,7 @@ chrono = { version = "0.4.19", features = ["serde"] } url = "2.2" humantime = "2.1.0" humantime-serde = "1.1.1" -thiserror = "1" +thiserror = "0.0.1" netpod = { path = "../netpod" } items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } diff --git a/crates/query/src/api4/binned.rs b/crates/query/src/api4/binned.rs index 71b74a2..f80f442 100644 --- a/crates/query/src/api4/binned.rs +++ b/crates/query/src/api4/binned.rs @@ -22,9 +22,11 @@ use url::Url; #[derive(Debug, thiserror::Error)] #[cstm(name = "BinnedQuery")] pub enum Error { + BadInt(#[from] std::num::ParseIntError), MultipleBinCountBinWidth, BadUseRt, Netpod(#[from] netpod::NetpodError), + Transform(#[from] crate::transform::Error), } mod serde_option_vec_duration { diff --git a/crates/streams/Cargo.toml b/crates/streams/Cargo.toml index 19c9407..6fb57e0 100644 --- a/crates/streams/Cargo.toml +++ b/crates/streams/Cargo.toml @@ -18,11 +18,11 @@ bytes = "1.6" arrayref = "0.3.6" crc32fast = "1.3.2" byteorder = "1.4.3" -async-channel = "1.8.0" +async-channel = "1.9.0" rand_xoshiro = "0.6.0" +thiserror = "0.0.1" chrono = { version = "0.4.19", features = ["serde"] } wasmer = { version = "4.1.0", default-features = false, features = ["sys", "cranelift"], optional = true } -err = { path = "../err" } netpod = { path = "../netpod" } query = { path = "../query" } items_0 = { path = "../items_0" } @@ -35,3 +35,6 @@ taskrun = { path = "../taskrun" } [features] wasm_transform = ["wasmer"] + +[patch.crates-io] +thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" } diff --git a/crates/streams/src/cbor_stream.rs b/crates/streams/src/cbor_stream.rs index 19061b3..f6ec503 100644 --- a/crates/streams/src/cbor_stream.rs +++ b/crates/streams/src/cbor_stream.rs @@ -2,9 +2,10 @@ use bytes::Buf; use bytes::BufMut; use bytes::Bytes; use bytes::BytesMut; -use err::Error; use futures_util::Stream; use futures_util::StreamExt; +use items_0::streamitem::sitem_err2_from_string; +use items_0::streamitem::sitem_err_from_string; use items_0::streamitem::LogItem; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; @@ -27,16 +28,24 @@ use std::time::Duration; const FRAME_HEAD_LEN: usize = 16; const FRAME_PAYLOAD_MAX: u32 = 1024 * 1024 * 80; -trait ErrConv { - fn ec(self) -> Result; +#[derive(Debug, thiserror::Error)] +#[cstm(name = "CborStream")] +pub enum Error { + FromSlice(#[from] std::array::TryFromSliceError), + Msg(String), + Ciborium(#[from] ciborium::de::Error), } -impl ErrConv for Result> +struct ErrMsg(E) where - K: fmt::Debug, + E: ToString; + +impl From> for Error +where + E: ToString, { - fn ec(self) -> Result { - self.map_err(|e| Error::from_string(format!("{e}"))) + fn from(value: ErrMsg) -> Self { + Self::Msg(value.0.to_string()) } } @@ -67,8 +76,7 @@ impl From for Bytes { pub type CborStream = Pin> + Send>>; // TODO move this type decl because it is not specific to cbor -pub type SitemtyDynEventsStream = - Pin>>, Error>> + Send>>; +pub type SitemtyDynEventsStream = Pin>> + Send>>; pub fn events_stream_to_cbor_stream(stream: SitemtyDynEventsStream) -> impl Stream> { let interval = tokio::time::interval(Duration::from_millis(4000)); @@ -83,7 +91,7 @@ pub fn events_stream_to_cbor_stream(stream: SitemtyDynEventsStream) -> impl Stre prepend.chain(stream) } -fn map_events(x: Result>>, Error>) -> Result { +fn map_events(x: Sitemty>) -> Result { match x { Ok(x) => match x { StreamItem::DataItem(x) => match x { @@ -93,8 +101,7 @@ fn map_events(x: Result>>, Error // TODO impl generically on EventsDim0 ? if let Some(evs) = evs.as_any_ref().downcast_ref::>() { let mut buf = Vec::new(); - ciborium::into_writer(evs, &mut buf) - .map_err(|e| Error::with_msg_no_trace(format!("{e}")))?; + ciborium::into_writer(evs, &mut buf).map_err(|e| Error::Msg(e.to_string()))?; let bytes = Bytes::from(buf); let _item = CborBytes(bytes); // Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) @@ -142,9 +149,9 @@ fn map_events(x: Result>>, Error let item = cbor!({ "rangeFinal" => true, }) - .map_err(Error::from_string)?; + .map_err(|e| Error::Msg(e.to_string()))?; let mut buf = Vec::with_capacity(64); - ciborium::into_writer(&item, &mut buf).map_err(Error::from_string)?; + ciborium::into_writer(&item, &mut buf).map_err(|e| Error::Msg(e.to_string()))?; let bytes = Bytes::from(buf); let item = CborBytes(bytes); Ok(item) @@ -166,9 +173,9 @@ fn map_events(x: Result>>, Error let item = cbor!({ "error" => e.to_string(), }) - .map_err(Error::from_string)?; + .map_err(|e| Error::Msg(e.to_string()))?; let mut buf = Vec::with_capacity(64); - ciborium::into_writer(&item, &mut buf).map_err(Error::from_string)?; + ciborium::into_writer(&item, &mut buf).map_err(|e| Error::Msg(e.to_string()))?; let bytes = Bytes::from(buf); let item = CborBytes(bytes); Ok(item) @@ -181,9 +188,9 @@ fn make_keepalive() -> Result { let item = cbor!({ "type" => "keepalive", }) - .map_err(Error::from_string)?; + .map_err(ErrMsg)?; let mut buf = Vec::with_capacity(64); - ciborium::into_writer(&item, &mut buf).map_err(Error::from_string)?; + ciborium::into_writer(&item, &mut buf).map_err(ErrMsg)?; let bytes = Bytes::from(buf); let item = Ok(CborBytes(bytes)); item @@ -213,7 +220,7 @@ impl FramedBytesToSitemtyDynEventsStream { } let n = u32::from_le_bytes(self.buf[..4].try_into()?); if n > FRAME_PAYLOAD_MAX { - let e = Error::with_msg_no_trace(format!("frame too large {n}")); + let e = ErrMsg(format!("frame too large {n}")).into(); error!("{e}"); return Err(e); } @@ -227,7 +234,7 @@ impl FramedBytesToSitemtyDynEventsStream { return Ok(None); } let buf = &self.buf[FRAME_HEAD_LEN..frame_len]; - let val: ciborium::Value = ciborium::from_reader(std::io::Cursor::new(buf)).map_err(Error::from_string)?; + let val: ciborium::Value = ciborium::from_reader(std::io::Cursor::new(buf)).map_err(ErrMsg)?; // debug!("decoded ciborium value {val:?}"); let item = if let Some(map) = val.as_map() { let keys: Vec<&str> = map.iter().map(|k| k.0.as_text().unwrap_or("(none)")).collect(); @@ -285,14 +292,14 @@ where use Poll::*; loop { break match self.try_parse() { - Ok(Some(x)) => Ready(Some(x)), + Ok(Some(x)) => Ready(Some(x.map_err(|e| sitem_err2_from_string(e)))), Ok(None) => match self.inp.poll_next_unpin(cx) { Ready(Some(x)) => match x { Ok(x) => { self.buf.put_slice(&x); continue; } - Err(e) => Ready(Some(Err(e))), + Err(e) => Ready(Some(sitem_err_from_string(e))), }, Ready(None) => { if self.buf.len() > 0 { @@ -302,7 +309,7 @@ where } Pending => Pending, }, - Err(e) => Ready(Some(Err(e))), + Err(e) => Ready(Some(sitem_err_from_string(e))), }; } } @@ -312,7 +319,7 @@ macro_rules! cbor_scalar { ($ty:ident, $buf:expr) => {{ type T = $ty; type C = EventsDim0; - let item: C = ciborium::from_reader(Cursor::new($buf)).ec()?; + let item: C = ciborium::from_reader(Cursor::new($buf))?; Box::new(item) }}; } @@ -321,7 +328,7 @@ macro_rules! cbor_wave { ($ty:ident, $buf:expr) => {{ type T = $ty; type C = EventsDim1; - let item: C = ciborium::from_reader(Cursor::new($buf)).ec()?; + let item: C = ciborium::from_reader(Cursor::new($buf))?; Box::new(item) }}; } @@ -339,23 +346,13 @@ fn decode_cbor_to_box_events(buf: &[u8], scalar_type: &ScalarType, shape: &Shape ScalarType::I64 => cbor_scalar!(i64, buf), ScalarType::F32 => cbor_scalar!(f32, buf), ScalarType::F64 => cbor_scalar!(f64, buf), - _ => { - return Err(Error::from_string(format!( - "decode_cbor_to_box_events {:?} {:?}", - scalar_type, shape - ))) - } + _ => return Err(ErrMsg(format!("decode_cbor_to_box_events {:?} {:?}", scalar_type, shape)).into()), }, Shape::Wave(_) => match scalar_type { ScalarType::U8 => cbor_wave!(u8, buf), ScalarType::U16 => cbor_wave!(u16, buf), ScalarType::I64 => cbor_wave!(i64, buf), - _ => { - return Err(Error::from_string(format!( - "decode_cbor_to_box_events {:?} {:?}", - scalar_type, shape - ))) - } + _ => return Err(ErrMsg(format!("decode_cbor_to_box_events {:?} {:?}", scalar_type, shape)).into()), }, Shape::Image(_, _) => todo!(), }; diff --git a/crates/streams/src/collect.rs b/crates/streams/src/collect.rs index f51d2ad..298f3ce 100644 --- a/crates/streams/src/collect.rs +++ b/crates/streams/src/collect.rs @@ -1,4 +1,3 @@ -use err::Error; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; @@ -23,22 +22,24 @@ use std::time::Duration; use std::time::Instant; use tracing::Instrument; -#[allow(unused)] -macro_rules! trace2 { - (D$($arg:tt)*) => (); - ($($arg:tt)*) => (eprintln!($($arg)*)); +#[derive(Debug, thiserror::Error)] +#[cstm(name = "CollectDyn")] +pub enum Error { + Msg(String), + NoResultNoCollector, } -#[allow(unused)] -macro_rules! trace3 { - (D$($arg:tt)*) => (); - ($($arg:tt)*) => (eprintln!($($arg)*)); -} +struct ErrMsg(E) +where + E: ToString; -#[allow(unused)] -macro_rules! trace4 { - (D$($arg:tt)*) => (); - ($($arg:tt)*) => (eprintln!($($arg)*)); +impl From> for Error +where + E: ToString, +{ + fn from(value: ErrMsg) -> Self { + Self::Msg(value.0.to_string()) + } } pub enum CollectResult { @@ -154,7 +155,7 @@ impl Collect { }, Err(e) => { // TODO Need to use some flags to get good enough error message for remote user. - Err(e) + Err(ErrMsg(e).into()) } } } @@ -184,7 +185,7 @@ impl Future for Collect { //info!("collect stats total duration: {:?}", total_duration); Ready(Ok(CollectResult::Some(res))) } - Err(e) => Ready(Err(e)), + Err(e) => Ready(Err(ErrMsg(e).into())), }, None => { debug!("no result because no collector was created"); @@ -310,15 +311,16 @@ where }, Err(e) => { // TODO Need to use some flags to get good enough error message for remote user. - return Err(e); + return Err(ErrMsg(e).into()); } } } let _ = range_complete; let _ = timed_out; let res = collector - .ok_or_else(|| Error::with_msg_no_trace(format!("no result, no collector created")))? - .result(range, binrange)?; + .ok_or_else(|| Error::NoResultNoCollector)? + .result(range, binrange) + .map_err(ErrMsg)?; info!("collect_in_span stats total duration: {:?}", total_duration); Ok(res) } diff --git a/crates/streams/src/eventsplainreader.rs b/crates/streams/src/eventsplainreader.rs index 29ae922..8805865 100644 --- a/crates/streams/src/eventsplainreader.rs +++ b/crates/streams/src/eventsplainreader.rs @@ -6,6 +6,7 @@ use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; +use items_0::streamitem::sitem_err_from_string; use items_0::streamitem::Sitemty; use items_2::channelevents::ChannelEvents; use netpod::ReqCtx; @@ -15,10 +16,16 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; +#[derive(Debug, thiserror::Error)] +#[cstm(name = "EventsPlainReader")] +pub enum Error { + Timebinned(#[from] crate::timebinnedjson::Error), +} + type ChEvsBox = Pin> + Send>>; enum StreamState { - Opening(Pin> + Send>>), + Opening(Pin> + Send>>), Reading(ChEvsBox), } @@ -38,7 +45,7 @@ impl Stream for InnerStream { self.state = StreamState::Reading(x); continue; } - Ready(Err(e)) => Ready(Some(Err(e))), + Ready(Err(e)) => Ready(Some(sitem_err_from_string(e))), Pending => Pending, }, StreamState::Reading(fut) => match fut.poll_next_unpin(cx) { @@ -82,7 +89,7 @@ impl EventsReadProvider for SfDatabufferEventReadProvider { open_bytes, ) .await; - ret.map(|x| Box::pin(x) as _) + ret.map_err(|e| e.into()).map(|x| Box::pin(x) as _) })); let stream = InnerStream { state }; EventsReading::new(Box::pin(stream)) diff --git a/crates/streams/src/framed_bytes.rs b/crates/streams/src/framed_bytes.rs index 708f958..b4fe883 100644 --- a/crates/streams/src/framed_bytes.rs +++ b/crates/streams/src/framed_bytes.rs @@ -2,8 +2,6 @@ use bytes::Buf; use bytes::BufMut; use bytes::Bytes; use bytes::BytesMut; -use err::thiserror; -use err::ThisError; use futures_util::Stream; use futures_util::StreamExt; use netpod::log::*; @@ -24,7 +22,7 @@ macro_rules! trace_parse { }; } -#[derive(Debug, ThisError)] +#[derive(Debug, thiserror::Error)] #[cstm(name = "StreamFramedBytes")] pub enum Error { FrameTooLarge, diff --git a/crates/streams/src/frames/eventsfromframes.rs b/crates/streams/src/frames/eventsfromframes.rs index 5ac9e16..0f48d1c 100644 --- a/crates/streams/src/frames/eventsfromframes.rs +++ b/crates/streams/src/frames/eventsfromframes.rs @@ -1,7 +1,7 @@ -use err::Error; use futures_util::Stream; use futures_util::StreamExt; use items_0::framable::FrameTypeInnerStatic; +use items_0::streamitem::sitem_err_from_string; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; @@ -14,6 +14,10 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; +#[derive(Debug, thiserror::Error)] +#[cstm(name = "FromFrames")] +pub enum Error {} + pub struct EventsFromFrames { inp: Pin, Error>> + Send>>, dbgdesc: String, @@ -100,7 +104,7 @@ where }, Ready(Some(Err(e))) => { self.errored = true; - Ready(Some(Err(e))) + Ready(Some(sitem_err_from_string(e))) } Ready(None) => { self.completed = true; diff --git a/crates/streams/src/frames/inmem.rs b/crates/streams/src/frames/inmem.rs index bb9e410..66c5167 100644 --- a/crates/streams/src/frames/inmem.rs +++ b/crates/streams/src/frames/inmem.rs @@ -1,8 +1,8 @@ use crate::slidebuf::SlideBuf; use bytes::Bytes; -use err::Error; use futures_util::pin_mut; use futures_util::Stream; +use items_0::streamitem::SitemErrTy; use items_0::streamitem::StreamItem; use items_0::streamitem::TERM_FRAME_TYPE_ID; use items_2::framable::INMEM_FRAME_FOOT; @@ -16,17 +16,25 @@ use std::task::Context; use std::task::Poll; use tokio::io::AsyncRead; -pub type BoxedBytesStream = Pin> + Send>>; - -#[allow(unused)] -macro_rules! trace2 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ); } - -impl err::ToErr for crate::slidebuf::Error { - fn to_err(self) -> Error { - Error::with_msg_no_trace(format!("{self}")) - } +#[derive(Debug, thiserror::Error)] +#[cstm(name = "InMem")] +pub enum Error { + Input, + Slidebuf(#[from] crate::slidebuf::Error), + IO(#[from] std::io::Error), + LessThanNeedMin, + LessThanHeader, + HugeFrame(u32), + BadMagic(u32), + TryFromSlice(#[from] std::array::TryFromSliceError), + BadCrc, + EnoughInputNothingParsed, } +pub type BoxedBytesStream = Pin> + Send>>; + +macro_rules! trace2 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ); } + pub struct TcpReadAsBytes { inp: INP, } @@ -41,7 +49,7 @@ impl Stream for TcpReadAsBytes where INP: AsyncRead + Unpin, { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -118,7 +126,7 @@ where } Err(e) => Ready(Err(e.into())), }, - Ready(Some(Err(_e))) => Ready(Err(Error::with_msg_no_trace("input error"))), + Ready(Some(Err(_e))) => Ready(Err(Error::Input)), Ready(None) => Ready(Ok(0)), Pending => Pending, } @@ -140,10 +148,10 @@ where fn parse(&mut self) -> Result, Error> { let buf = self.buf.data(); if buf.len() < self.need_min { - return Err(Error::with_msg_no_trace("expect at least need_min")); + return Err(Error::LessThanNeedMin); } if buf.len() < INMEM_FRAME_HEAD { - return Err(Error::with_msg_no_trace("expect at least enough bytes for the header")); + return Err(Error::LessThanHeader); } let magic = u32::from_le_bytes(buf[0..4].try_into()?); let encid = u32::from_le_bytes(buf[4..8].try_into()?); @@ -158,7 +166,7 @@ where magic, u ); error!("{msg}"); - return Err(Error::with_msg(msg)); + return Err(Error::BadMagic(magic)); } if len > 1024 * 1024 * 50 { let msg = format!( @@ -166,7 +174,7 @@ where len, self.inp_bytes_consumed ); error!("{msg}"); - return Err(Error::with_msg(msg)); + return Err(Error::HugeFrame(len)); } let lentot = INMEM_FRAME_HEAD + INMEM_FRAME_FOOT + len as usize; if buf.len() < lentot { @@ -191,7 +199,7 @@ where payload_crc_match, frame_crc_match, ); error!("{msg}"); - let e = Error::with_msg_no_trace(msg); + let e = Error::BadCrc; return Err(e); } self.inp_bytes_consumed += lentot as u64; @@ -230,7 +238,7 @@ where Ok(None) => { if self.buf.len() >= self.need_min { self.done = true; - let e = Error::with_msg_no_trace("enough bytes but nothing parsed"); + let e = Error::EnoughInputNothingParsed; Ready(Some(Err(e))) } else { continue; diff --git a/crates/streams/src/generators.rs b/crates/streams/src/generators.rs index 5b8abae..c2a2405 100644 --- a/crates/streams/src/generators.rs +++ b/crates/streams/src/generators.rs @@ -1,14 +1,16 @@ use crate::frames::inmem::BoxedBytesStream; use crate::transform::build_event_transform; -use err::Error; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; +use futures_util::TryStreamExt; use items_0::container::ByteEstimate; use items_0::on_sitemty_data; use items_0::streamitem::sitem_data; +use items_0::streamitem::sitem_err2_from_string; use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::SitemErrTy; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::Appendable; @@ -30,13 +32,20 @@ use std::task::Context; use std::task::Poll; use std::time::Duration; +#[derive(Debug, thiserror::Error)] +#[cstm(name = "Generator")] +pub enum Error { + UnsupportedIsEventBlobs, + Transform(#[from] crate::transform::Error), +} + pub fn make_test_channel_events_bytes_stream( subq: EventsSubQuery, node_count: u64, node_ix: u64, ) -> Result { if subq.is_event_blobs() { - let e = Error::with_msg_no_trace("evq.is_event_blobs() not supported in this generator"); + let e = Error::UnsupportedIsEventBlobs; error!("{e}"); Err(e) } else { @@ -57,7 +66,9 @@ pub fn make_test_channel_events_bytes_stream( } }) }); - let stream = stream.map(|x| x.make_frame_dyn().map(|x| x.freeze())); + let stream = stream + .map_err(sitem_err2_from_string) + .map(|x| x.make_frame_dyn().map(|x| x.freeze())); let ret = Box::pin(stream); Ok(ret) } diff --git a/crates/streams/src/itemclone.rs b/crates/streams/src/itemclone.rs index 13fb8a9..c95d762 100644 --- a/crates/streams/src/itemclone.rs +++ b/crates/streams/src/itemclone.rs @@ -1,6 +1,5 @@ use async_channel::Send; use async_channel::Sender; -use err::Error; use futures_util::pin_mut; use futures_util::Future; use futures_util::Stream; @@ -10,6 +9,10 @@ use std::ptr::NonNull; use std::task::Context; use std::task::Poll; +#[derive(Debug, thiserror::Error)] +#[cstm(name = "ItemClone")] +pub enum Error {} + pub struct Itemclone<'a, T, INP> where T: 'static, diff --git a/crates/streams/src/json_stream.rs b/crates/streams/src/json_stream.rs index 3637e2c..3e80955 100644 --- a/crates/streams/src/json_stream.rs +++ b/crates/streams/src/json_stream.rs @@ -1,5 +1,4 @@ use crate::cbor_stream::SitemtyDynEventsStream; -use err::Error; use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::RangeCompletableItem; @@ -10,6 +9,10 @@ use netpod::log::*; use std::pin::Pin; use std::time::Duration; +#[derive(Debug, thiserror::Error)] +#[cstm(name = "JsonStream")] +pub enum Error {} + pub struct JsonBytes(String); impl JsonBytes { diff --git a/crates/streams/src/needminbuffer.rs b/crates/streams/src/needminbuffer.rs index 684d61c..adac34e 100644 --- a/crates/streams/src/needminbuffer.rs +++ b/crates/streams/src/needminbuffer.rs @@ -1,10 +1,15 @@ use crate::filechunkread::FileChunkRead; -use err::Error; -use futures_util::{Stream, StreamExt}; +use futures_util::Stream; +use futures_util::StreamExt; use netpod::histo::HistoLog2; use netpod::log::*; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::Context; +use std::task::Poll; + +#[derive(Debug, thiserror::Error)] +#[cstm(name = "NeedMinBuffer")] +pub enum Error {} pub struct NeedMinBuffer { inp: Pin> + Send>>, diff --git a/crates/streams/src/plaineventscbor.rs b/crates/streams/src/plaineventscbor.rs index 62cee23..c3470ea 100644 --- a/crates/streams/src/plaineventscbor.rs +++ b/crates/streams/src/plaineventscbor.rs @@ -4,14 +4,11 @@ use crate::firsterr::non_empty; use crate::firsterr::only_first_err; use crate::plaineventsstream::dyn_events_stream; use crate::tcprawclient::OpenBoxedBytesStreamsBox; -use err::thiserror; -use err::ThisError; -use netpod::log::*; use netpod::ChannelTypeConfigGen; use netpod::ReqCtx; use query::api4::events::PlainEventsQuery; -#[derive(Debug, ThisError)] +#[derive(Debug, thiserror::Error)] #[cstm(name = "PlainEventsCbor")] pub enum Error { Stream(#[from] crate::plaineventsstream::Error), diff --git a/crates/streams/src/plaineventsjson.rs b/crates/streams/src/plaineventsjson.rs index f88c9a6..3427278 100644 --- a/crates/streams/src/plaineventsjson.rs +++ b/crates/streams/src/plaineventsjson.rs @@ -6,8 +6,6 @@ use crate::json_stream::events_stream_to_json_stream; use crate::json_stream::JsonStream; use crate::plaineventsstream::dyn_events_stream; use crate::tcprawclient::OpenBoxedBytesStreamsBox; -use err::thiserror; -use err::ThisError; use futures_util::StreamExt; use items_0::collect_s::CollectableDyn; use items_0::on_sitemty_data; @@ -21,13 +19,11 @@ use serde_json::Value as JsonValue; use std::time::Duration; use std::time::Instant; -#[derive(Debug, ThisError)] +#[derive(Debug, thiserror::Error)] #[cstm(name = "PlainEventsJson")] pub enum Error { Stream(#[from] crate::plaineventsstream::Error), - Collect(err::Error), Json(#[from] serde_json::Error), - Err(err::Error), } pub async fn plain_events_json( @@ -92,11 +88,10 @@ pub async fn plain_events_json( Some(evq.range().clone()), None, ) - .await - .map_err(Error::Collect)?; + .await?; debug!("plain_events_json collected"); if let CollectResult::Some(x) = collected { - let jsval = x.to_json_value().map_err(|e| Error::Err(e))?; + let jsval = x.to_json_value()?; debug!("plain_events_json json serialized"); Ok(CollectResult::Some(jsval)) } else { diff --git a/crates/streams/src/plaineventsstream.rs b/crates/streams/src/plaineventsstream.rs index 21af217..40d6cbd 100644 --- a/crates/streams/src/plaineventsstream.rs +++ b/crates/streams/src/plaineventsstream.rs @@ -2,14 +2,10 @@ use crate::tcprawclient::container_stream_from_bytes_stream; use crate::tcprawclient::make_sub_query; use crate::tcprawclient::OpenBoxedBytesStreamsBox; use crate::transform::build_merged_event_transform; -use err::thiserror; -use err::ThisError; use futures_util::Stream; use futures_util::StreamExt; use items_0::on_sitemty_data; -use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; -use items_0::streamitem::StreamItem; use items_0::Events; use items_2::channelevents::ChannelEvents; use items_2::merger::Merger; @@ -19,10 +15,11 @@ use netpod::ReqCtx; use query::api4::events::PlainEventsQuery; use std::pin::Pin; -#[derive(Debug, ThisError)] +#[derive(Debug, thiserror::Error)] #[cstm(name = "PlainEventsStream")] pub enum Error { - OtherErr(#[from] err::Error), + Netpod(#[from] netpod::NetpodError), + Transform(#[from] crate::transform::Error), } pub type DynEventsStream = Pin>> + Send>>; @@ -88,13 +85,13 @@ pub async fn dyn_events_stream( } #[cfg(not(feature = "wasm_transform"))] -async fn transform_wasm( +async fn transform_wasm( stream: INP, _wasmname: &str, _ctx: &ReqCtx, -) -> Result>>, err::Error>> + Send, err::Error> +) -> Result>> + Send, Error> where - INP: Stream>>, err::Error>> + Send + 'static, + INP: Stream>> + Send + 'static, { let ret: Pin>> + Send>> = Box::pin(stream); Ok(ret) @@ -105,9 +102,9 @@ async fn transform_wasm( stream: INP, wasmname: &str, ctx: &ReqCtx, -) -> Result>>, Error>> + Send, Error> +) -> Result>> + Send, Error> where - INP: Stream>>, Error>> + Send + 'static, + INP: Stream>> + Send + 'static, { debug!("make wasm transform"); use httpclient::url::Url; diff --git a/crates/streams/src/test/collect.rs b/crates/streams/src/test/collect.rs index d8737bc..862aabb 100644 --- a/crates/streams/src/test/collect.rs +++ b/crates/streams/src/test/collect.rs @@ -3,7 +3,6 @@ use crate::collect::CollectResult; use crate::test::runfut; use crate::transform::build_event_transform; use crate::transform::EventsToTimeBinnable; -use err::Error; use futures_util::stream; use futures_util::StreamExt; use items_0::on_sitemty_data; @@ -21,31 +20,31 @@ use query::transform::TransformQuery; use std::time::Duration; use std::time::Instant; -#[test] -fn collect_channel_events_00() -> Result<(), Error> { - let fut = async { - let evs0 = make_some_boxed_d0_f32(20, SEC * 10, SEC * 1, 0, 28736487); - let evs1 = make_some_boxed_d0_f32(20, SEC * 30, SEC * 1, 0, 882716583); - let stream = stream::iter(vec![ - sitem_data(evs0), - sitem_data(evs1), - Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)), - ]); - let deadline = Instant::now() + Duration::from_millis(4000); - let events_max = 10000; - let res = crate::collect::collect(stream, deadline, events_max, None, None).await?; - //eprintln!("collected result: {res:?}"); - if let Some(res) = res.as_any_ref().downcast_ref::>() { - eprintln!("Great, a match"); - eprintln!("{res:?}"); - assert_eq!(res.len(), 40); - } else { - return Err(Error::with_msg(format!("bad type of collected result"))); - } - Ok(()) - }; - runfut(fut) -} +// #[test] +// fn collect_channel_events_00() -> Result<(), Error> { +// let fut = async { +// let evs0 = make_some_boxed_d0_f32(20, SEC * 10, SEC * 1, 0, 28736487); +// let evs1 = make_some_boxed_d0_f32(20, SEC * 30, SEC * 1, 0, 882716583); +// let stream = stream::iter(vec![ +// sitem_data(evs0), +// sitem_data(evs1), +// Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)), +// ]); +// let deadline = Instant::now() + Duration::from_millis(4000); +// let events_max = 10000; +// let res = crate::collect::collect(stream, deadline, events_max, None, None).await?; +// //eprintln!("collected result: {res:?}"); +// if let Some(res) = res.as_any_ref().downcast_ref::>() { +// eprintln!("Great, a match"); +// eprintln!("{res:?}"); +// assert_eq!(res.len(), 40); +// } else { +// return Err(Error::with_msg(format!("bad type of collected result"))); +// } +// Ok(()) +// }; +// runfut(fut) +// } // #[test] // fn collect_channel_events_01() -> Result<(), Error> { diff --git a/crates/streams/src/test/events.rs b/crates/streams/src/test/events.rs index ab297b1..5e2a422 100644 --- a/crates/streams/src/test/events.rs +++ b/crates/streams/src/test/events.rs @@ -5,7 +5,6 @@ use crate::lenframed; use crate::plaineventscbor::plain_events_cbor_stream; use crate::tcprawclient::OpenBoxedBytesStreams; use crate::tcprawclient::TEST_BACKEND; -use err::Error; use futures_util::future; use futures_util::Future; use futures_util::StreamExt; @@ -23,6 +22,13 @@ use query::api4::events::PlainEventsQuery; use std::pin::Pin; use std::sync::Arc; +#[derive(Debug, thiserror::Error)] +#[cstm(name = "TestEvents")] +pub enum Error { + InMem(#[from] crate::frames::inmem::Error), + Generator(#[from] crate::generators::Error), +} + #[test] fn merged_events_cbor() { crate::test::runfut(merged_events_inner()).unwrap(); diff --git a/crates/streams/src/timebin.rs b/crates/streams/src/timebin.rs index 29d7b5f..d4afb5b 100644 --- a/crates/streams/src/timebin.rs +++ b/crates/streams/src/timebin.rs @@ -3,7 +3,7 @@ pub mod fromevents; pub mod timebin; mod basic; -mod fromlayers; +pub(super) mod fromlayers; mod gapfill; mod grid; diff --git a/crates/streams/src/timebin/basic.rs b/crates/streams/src/timebin/basic.rs index a8d9a88..1452495 100644 --- a/crates/streams/src/timebin/basic.rs +++ b/crates/streams/src/timebin/basic.rs @@ -1,7 +1,7 @@ -use err::Error; use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::sitem_data; +use items_0::streamitem::sitem_err_from_string; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; @@ -16,17 +16,21 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; -#[allow(unused)] macro_rules! debug_first { ($($arg:tt)*) => ( if false { debug!($($arg)*); } ) } -#[allow(unused)] macro_rules! trace2 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } -#[allow(unused)] macro_rules! trace3 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } -#[allow(unused)] -macro_rules! trace4 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +#[derive(Debug, thiserror::Error)] +#[cstm(name = "TimeBinnedStream")] +pub enum Error { + MissingBinnerAfterProcessItem, + CreateEmpty, + NoBinnerAfterInputDone, + Stream, + Msg(String), +} type SitemtyStream = Pin> + Send>>; @@ -106,7 +110,7 @@ where self.binner.is_some() ); if self.binner.is_none() { - let e = Error::with_msg_no_trace("must emit on first input but no binner"); + let e = Error::MissingBinnerAfterProcessItem; self.done = true; return Err(e); } @@ -125,7 +129,7 @@ where if let Some(bins) = binner.empty() { Ok(Break(Ready(sitem_data(bins)))) } else { - let e = Error::with_msg_no_trace("must emit but can not even create empty A"); + let e = Error::CreateEmpty; error!("{e}"); Err(e) } @@ -142,7 +146,7 @@ where fn handle_item( &mut self, - item: Result>, Error>, + item: Sitemty, ) -> Result::TimeBinner as TimeBinnerTy>::Output>>>, Error> { use ControlFlow::*; use Poll::*; @@ -163,7 +167,7 @@ where Err(e) => { error!("received error item: {e}"); self.done = true; - Err(e) + Err(Error::Msg(e.to_string())) } } } @@ -191,7 +195,7 @@ where self.done_data = true; Ok(Break(Ready(sitem_data(bins)))) } else { - let e = Error::with_msg_no_trace("must emit but can not even create empty B"); + let e = Error::CreateEmpty; error!("{e}"); self.done_data = true; Err(e) @@ -200,7 +204,7 @@ where } else { warn!("input stream finished, still no binner"); self.done_data = true; - let e = Error::with_msg_no_trace(format!("input stream finished, still no binner")); + let e = Error::NoBinnerAfterInputDone; Err(e) } } @@ -261,7 +265,7 @@ where }, Err(e) => { self.done = true; - break Ready(Some(Err(e))); + break Ready(Some(sitem_err_from_string(e))); } } }; diff --git a/crates/streams/src/timebin/cached/reader.rs b/crates/streams/src/timebin/cached/reader.rs index aea708c..b7dd943 100644 --- a/crates/streams/src/timebin/cached/reader.rs +++ b/crates/streams/src/timebin/cached/reader.rs @@ -1,6 +1,4 @@ use crate as streams; -use err::thiserror; -use err::ThisError; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; @@ -19,6 +17,15 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; +#[derive(Debug, thiserror::Error)] +#[cstm(name = "BinCachedReader")] +pub enum Error { + TodoImpl, + ChannelSend, + ChannelRecv, + Scylla(String), +} + #[allow(unused)] macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } @@ -110,15 +117,6 @@ pub trait CacheReadProvider: Send + Sync { fn write(&self, series: u64, bins: BinsBoxed) -> CacheWriting; } -#[derive(Debug, ThisError)] -#[cstm(name = "BinCachedReader")] -pub enum Error { - TodoImpl, - ChannelSend, - ChannelRecv, - Scylla(String), -} - pub struct CachedReader { series: u64, range: BinnedRange, diff --git a/crates/streams/src/timebin/fromevents.rs b/crates/streams/src/timebin/fromevents.rs index 384ddaf..3e3cddf 100644 --- a/crates/streams/src/timebin/fromevents.rs +++ b/crates/streams/src/timebin/fromevents.rs @@ -1,7 +1,5 @@ use super::cached::reader::EventsReadProvider; use crate::events::convertforbinning::ConvertForBinning; -use err::thiserror; -use err::ThisError; use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::RangeCompletableItem; @@ -20,7 +18,7 @@ use std::task::Poll; macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } -#[derive(Debug, ThisError)] +#[derive(Debug, thiserror::Error)] #[cstm(name = "ReadingBinnedFromEvents")] pub enum Error { ExpectTimerange, diff --git a/crates/streams/src/timebin/fromlayers.rs b/crates/streams/src/timebin/fromlayers.rs index b9c1174..94af54a 100644 --- a/crates/streams/src/timebin/fromlayers.rs +++ b/crates/streams/src/timebin/fromlayers.rs @@ -2,8 +2,6 @@ use super::cached::reader::CacheReadProvider; use super::cached::reader::EventsReadProvider; use crate::timebin::fromevents::BinnedFromEvents; use crate::timebin::grid::find_next_finer_bin_len; -use err::thiserror; -use err::ThisError; use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::Sitemty; @@ -26,7 +24,7 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; -#[derive(Debug, ThisError)] +#[derive(Debug, thiserror::Error)] #[cstm(name = "TimeBinnedFromLayers")] pub enum Error { GapFill(#[from] super::gapfill::Error), diff --git a/crates/streams/src/timebin/gapfill.rs b/crates/streams/src/timebin/gapfill.rs index 3ed7399..7e1b4b3 100644 --- a/crates/streams/src/timebin/gapfill.rs +++ b/crates/streams/src/timebin/gapfill.rs @@ -1,11 +1,10 @@ use super::cached::reader::CacheReadProvider; use super::cached::reader::EventsReadProvider; use crate::timebin::fromevents::BinnedFromEvents; -use err::thiserror; -use err::ThisError; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; +use items_0::streamitem::sitem_err_from_string; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; @@ -41,7 +40,7 @@ macro_rules! debug_cache { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) } #[allow(unused)] macro_rules! trace_handle { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } -#[derive(Debug, ThisError)] +#[derive(Debug, thiserror::Error)] #[cstm(name = "BinCachedGapFill")] pub enum Error { CacheReader(#[from] super::cached::reader::Error), @@ -109,7 +108,7 @@ impl GapFill { let stream = super::cached::reader::CachedReader::new(series, range.clone(), cache_read_provider.clone())? .map(|x| match x { Ok(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))), - Err(e) => Err(::err::Error::from_string(e)), + Err(e) => sitem_err_from_string(e), }); Box::pin(stream) as Pin> + Send>> } else { @@ -294,7 +293,7 @@ impl GapFill { fn cache_write(mut self: Pin<&mut Self>, bins: BinsBoxed) -> Result<(), Error> { // TODO emit bins that are ready for cache write into some separate channel - let series = ::err::todoval(); + let series = todo!(); self.cache_writing = Some(self.cache_read_provider.write(series, bins)); Ok(()) } @@ -338,7 +337,7 @@ impl Stream for GapFill { } Ready(Err(e)) => { self.cache_writing = None; - Ready(Some(Err(::err::Error::from_string(e)))) + Ready(Some(sitem_err_from_string(e))) } Pending => Pending, } @@ -348,7 +347,7 @@ impl Stream for GapFill { StreamItem::DataItem(RangeCompletableItem::Data(x)) => { match self.as_mut().handle_bins_finer(x) { Ok(x) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))), - Err(e) => Ready(Some(Err(::err::Error::from_string(e)))), + Err(e) => Ready(Some(sitem_err_from_string(e))), } } StreamItem::DataItem(RangeCompletableItem::RangeComplete) => { @@ -358,7 +357,7 @@ impl Stream for GapFill { if self.cache_usage.is_cache_write() { match self.as_mut().cache_write_on_end() { Ok(()) => continue, - Err(e) => Ready(Some(Err(::err::Error::from_string(e)))), + Err(e) => Ready(Some(sitem_err_from_string(e))), } } else { continue; @@ -367,7 +366,7 @@ impl Stream for GapFill { StreamItem::Log(x) => Ready(Some(Ok(StreamItem::Log(x)))), StreamItem::Stats(x) => Ready(Some(Ok(StreamItem::Stats(x)))), }, - Ready(Some(Err(e))) => Ready(Some(Err(::err::Error::from_string(e)))), + Ready(Some(Err(e))) => Ready(Some(sitem_err_from_string(e))), Ready(None) => { trace_handle!( "{} inp_finer Ready(None) last_bin_ts2 {:?}", @@ -386,9 +385,7 @@ impl Stream for GapFill { exp_finer_range ); if self.inp_finer_fills_gap { - Ready(Some(Err(::err::Error::from_string( - "finer input didn't deliver to the end", - )))) + Ready(Some(sitem_err_from_string("finer input didn't deliver to the end"))) } else { warn!( "{} inp_finer Ready(None) last_bin_ts2 {:?} not delivered to the end, but maybe in the future", @@ -404,9 +401,9 @@ impl Stream for GapFill { "{} inp_finer Ready(None) last_bin_ts2 {:?}", self.dbgname, self.last_bin_ts2 ); - Ready(Some(Err(::err::Error::from_string( + Ready(Some(sitem_err_from_string( "finer input delivered nothing, received nothing at all so far", - )))) + ))) } else { warn!( "{} inp_finer Ready(None) last_bin_ts2 {:?}", @@ -420,14 +417,14 @@ impl Stream for GapFill { } else if let Some(x) = self.inp_buf.take() { match self.as_mut().handle_bins_finer(x) { Ok(x) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))), - Err(e) => Ready(Some(Err(::err::Error::from_string(e)))), + Err(e) => Ready(Some(sitem_err_from_string(e))), } } else if let Some(inp) = self.inp.as_mut() { match inp.poll_next_unpin(cx) { Ready(Some(Ok(x))) => match x { StreamItem::DataItem(RangeCompletableItem::Data(x)) => match self.as_mut().handle_bins(x) { Ok(x) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))), - Err(e) => Ready(Some(Err(::err::Error::from_string(e)))), + Err(e) => Ready(Some(sitem_err_from_string(e))), }, StreamItem::DataItem(RangeCompletableItem::RangeComplete) => { self.inp_range_final = true; @@ -436,7 +433,7 @@ impl Stream for GapFill { StreamItem::Log(x) => Ready(Some(Ok(StreamItem::Log(x)))), StreamItem::Stats(x) => Ready(Some(Ok(StreamItem::Stats(x)))), }, - Ready(Some(Err(e))) => Ready(Some(Err(::err::Error::from_string(e)))), + Ready(Some(Err(e))) => Ready(Some(sitem_err_from_string(e))), Ready(None) => { self.inp = None; // TODO assert that we have emitted up to the requested range. @@ -455,7 +452,7 @@ impl Stream for GapFill { Ok(()) => { continue; } - Err(e) => Ready(Some(Err(::err::Error::from_string(e)))), + Err(e) => Ready(Some(sitem_err_from_string(e))), } } else { debug!("{} received everything", self.dbgname); @@ -471,7 +468,7 @@ impl Stream for GapFill { Ok(()) => { continue; } - Err(e) => Ready(Some(Err(::err::Error::from_string(e)))), + Err(e) => Ready(Some(sitem_err_from_string(e))), } } } diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index afa44f9..98ada0e 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -9,7 +9,6 @@ use crate::tcprawclient::OpenBoxedBytesStreamsBox; use crate::timebin::cached::reader::EventsReadProvider; use crate::timebin::CacheReadProvider; use crate::transform::build_merged_event_transform; -use err::Error; use futures_util::future::BoxFuture; use futures_util::Stream; use futures_util::StreamExt; @@ -36,6 +35,14 @@ use std::sync::Arc; use std::time::Duration; use std::time::Instant; +#[derive(Debug, thiserror::Error)] +#[cstm(name = "TimebinnedJson")] +pub enum Error { + Query(#[from] query::api4::binned::Error), + FromLayers(#[from] super::timebin::fromlayers::Error), + Transform(#[from] super::transform::Error), +} + #[allow(unused)] fn assert_stream_send<'u, R>(stream: impl 'u + Send + Stream) -> impl 'u + Send + Stream { stream @@ -249,8 +256,7 @@ async fn timebinned_stream( bin_len_layers, cache_read_provider, events_read_provider, - ) - .map_err(Error::from_string)?; + )?; let stream = stream.map(|item| { use items_0::timebin::BinningggContainerBinsDyn; on_sitemty_data!(item, |mut x: Box| { diff --git a/crates/streams/src/transform.rs b/crates/streams/src/transform.rs index 0dd8bdf..ff86ad7 100644 --- a/crates/streams/src/transform.rs +++ b/crates/streams/src/transform.rs @@ -1,4 +1,3 @@ -use err::Error; use futures_util::Stream; use futures_util::StreamExt; use items_0::collect_s::CollectableDyn; @@ -19,6 +18,10 @@ use query::transform::TimeBinningTransformQuery; use query::transform::TransformQuery; use std::pin::Pin; +#[derive(Debug, thiserror::Error)] +#[cstm(name = "Transform")] +pub enum Error {} + pub fn build_event_transform(tr: &TransformQuery) -> Result { let trev = tr.get_tr_event(); match trev {