diff --git a/crates/disk/src/eventchunker.rs b/crates/disk/src/eventchunker.rs index b21dec3..39c032b 100644 --- a/crates/disk/src/eventchunker.rs +++ b/crates/disk/src/eventchunker.rs @@ -140,7 +140,7 @@ impl EventChunker { // TODO `expand` flag usage pub fn from_start( - inp: Pin> + Send>>, + inp: Pin> + Send>>, fetch_info: SfChFetchInfo, range: NanoRange, stats_conf: EventChunkerConf, diff --git a/crates/disk/src/merge/mergedblobsfromremotes.rs b/crates/disk/src/merge/mergedblobsfromremotes.rs index f4c6c10..2f939c1 100644 --- a/crates/disk/src/merge/mergedblobsfromremotes.rs +++ b/crates/disk/src/merge/mergedblobsfromremotes.rs @@ -1,7 +1,8 @@ -use err::Error; use futures_util::pin_mut; use futures_util::Stream; use futures_util::StreamExt; +use futures_util::TryFutureExt; +use items_0::streamitem::sitem_err2_from_string; use items_0::streamitem::Sitemty; use items_2::eventfull::EventFull; use items_2::merger::Merger; @@ -16,7 +17,7 @@ use std::task::Poll; use streams::tcprawclient::x_processed_event_blobs_stream_from_node; type T001 = Pin> + Send>>; -type T002 = Pin, Error>> + Send>>; +type T002 = Pin, items_0::streamitem::SitemErrTy>> + Send>>; pub struct MergedBlobsFromRemotes { tcp_establish_futs: Vec>, @@ -32,6 +33,7 @@ impl MergedBlobsFromRemotes { let mut tcp_establish_futs = Vec::new(); for node in &cluster.nodes { let f = x_processed_event_blobs_stream_from_node(subq.clone(), node.clone(), ctx.clone()); + let f = f.map_err(sitem_err2_from_string); let f: T002 = Box::pin(f); tcp_establish_futs.push(f); } diff --git a/crates/disk/src/raw/conn.rs b/crates/disk/src/raw/conn.rs index 29832dd..dcef738 100644 --- a/crates/disk/src/raw/conn.rs +++ b/crates/disk/src/raw/conn.rs @@ -73,7 +73,7 @@ pub async fn make_event_pipe( 128 }; let event_blobs = EventChunkerMultifile::new( - (&range).try_into()?, + (&range).try_into().map_err(Error::from_string)?, fetch_info.clone(), ncc.node.clone(), ncc.ix, @@ -129,7 +129,7 @@ pub fn make_event_blobs_pipe_real( let range = subq.range(); let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024)); let event_blobs = make_event_blobs_stream( - range.try_into()?, + range.try_into().map_err(Error::from_string)?, fetch_info.clone(), one_before, event_chunker_conf, diff --git a/crates/items_0/src/collect_s.rs b/crates/items_0/src/collect_s.rs index 3f8252a..fcf1d83 100644 --- a/crates/items_0/src/collect_s.rs +++ b/crates/items_0/src/collect_s.rs @@ -20,7 +20,7 @@ pub trait ToJsonBytes { } pub trait ToJsonResult: fmt::Debug + AsAnyRef + AsAnyMut + Send { - fn to_json_value(&self) -> Result; + fn to_json_value(&self) -> Result; } impl AsAnyRef for serde_json::Value { @@ -36,7 +36,7 @@ impl AsAnyMut for serde_json::Value { } impl ToJsonResult for serde_json::Value { - fn to_json_value(&self) -> Result { + fn to_json_value(&self) -> Result { Ok(self.clone()) } } @@ -50,7 +50,7 @@ impl ToJsonBytes for serde_json::Value { pub trait CollectedDyn: fmt::Debug + TypeName + Send + AsAnyRef + WithLen + ToJsonResult {} impl ToJsonResult for Box { - fn to_json_value(&self) -> Result { + fn to_json_value(&self) -> Result { ToJsonResult::to_json_value(self.as_ref()) } } diff --git a/crates/items_2/src/binning/container_bins.rs b/crates/items_2/src/binning/container_bins.rs index 886d484..31059eb 100644 --- a/crates/items_2/src/binning/container_bins.rs +++ b/crates/items_2/src/binning/container_bins.rs @@ -450,7 +450,7 @@ impl ToJsonResult for ContainerBinsCollectorOutput where EVT: EventValueType, { - fn to_json_value(&self) -> Result { + fn to_json_value(&self) -> Result { let bins = &self.bins; let ts1sns: Vec<_> = bins.ts1s.iter().map(|x| x.ns()).collect(); let ts2sns: Vec<_> = bins.ts2s.iter().map(|x| x.ns()).collect(); @@ -471,8 +471,7 @@ where maxs, avgs, }; - let ret = serde_json::to_value(&val).map_err(err::Error::from_string); - ret + serde_json::to_value(&val) } } diff --git a/crates/items_2/src/binsdim0.rs b/crates/items_2/src/binsdim0.rs index 5f56638..8cc4ee1 100644 --- a/crates/items_2/src/binsdim0.rs +++ b/crates/items_2/src/binsdim0.rs @@ -731,8 +731,8 @@ impl BinsDim0CollectedResult { } impl ToJsonResult for BinsDim0CollectedResult { - fn to_json_value(&self) -> Result { - serde_json::to_value(self).map_err(Error::from_string) + fn to_json_value(&self) -> Result { + serde_json::to_value(self) } } diff --git a/crates/items_2/src/binsxbindim0.rs b/crates/items_2/src/binsxbindim0.rs index 3eee9da..136768f 100644 --- a/crates/items_2/src/binsxbindim0.rs +++ b/crates/items_2/src/binsxbindim0.rs @@ -361,8 +361,8 @@ impl BinsXbinDim0CollectedResult { } impl ToJsonResult for BinsXbinDim0CollectedResult { - fn to_json_value(&self) -> Result { - serde_json::to_value(self).map_err(Error::from_string) + fn to_json_value(&self) -> Result { + serde_json::to_value(self) } } diff --git a/crates/items_2/src/channelevents.rs b/crates/items_2/src/channelevents.rs index a297c74..457110c 100644 --- a/crates/items_2/src/channelevents.rs +++ b/crates/items_2/src/channelevents.rs @@ -1023,8 +1023,8 @@ impl WithLen for ChannelEventsCollectorOutput { } impl items_0::collect_s::ToJsonResult for ChannelEventsCollectorOutput { - fn to_json_value(&self) -> Result { - serde_json::to_value(self).map_err(::err::Error::from_string) + fn to_json_value(&self) -> Result { + serde_json::to_value(self) } } diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index 8aeadc6..b5cde68 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -371,8 +371,8 @@ impl WithLen for EventsDim0CollectorOutput { } impl ToJsonResult for EventsDim0CollectorOutput { - fn to_json_value(&self) -> Result { - serde_json::to_value(self).map_err(Error::from_string) + fn to_json_value(&self) -> Result { + serde_json::to_value(self) } } diff --git a/crates/items_2/src/eventsdim0enum.rs b/crates/items_2/src/eventsdim0enum.rs index 38b08b3..babe914 100644 --- a/crates/items_2/src/eventsdim0enum.rs +++ b/crates/items_2/src/eventsdim0enum.rs @@ -120,7 +120,7 @@ impl TypeName for EventsDim0EnumCollectorOutput { } impl ToJsonResult for EventsDim0EnumCollectorOutput { - fn to_json_value(&self) -> Result { + fn to_json_value(&self) -> Result { todo!() } } diff --git a/crates/items_2/src/eventsdim1.rs b/crates/items_2/src/eventsdim1.rs index 4e7bbe3..4158cc1 100644 --- a/crates/items_2/src/eventsdim1.rs +++ b/crates/items_2/src/eventsdim1.rs @@ -334,8 +334,8 @@ impl WithLen for EventsDim1CollectorOutput { } impl ToJsonResult for EventsDim1CollectorOutput { - fn to_json_value(&self) -> Result { - serde_json::to_value(self).map_err(Error::from_string) + fn to_json_value(&self) -> Result { + serde_json::to_value(self) } } diff --git a/crates/items_2/src/eventsxbindim0.rs b/crates/items_2/src/eventsxbindim0.rs index 0419ac7..b79ad77 100644 --- a/crates/items_2/src/eventsxbindim0.rs +++ b/crates/items_2/src/eventsxbindim0.rs @@ -652,8 +652,8 @@ impl ToJsonResult for EventsXbinDim0CollectorOutput where NTY: ScalarOps, { - fn to_json_value(&self) -> Result { - serde_json::to_value(self).map_err(Error::from_string) + fn to_json_value(&self) -> Result { + serde_json::to_value(self) } } diff --git a/crates/items_2/src/framable.rs b/crates/items_2/src/framable.rs index f7466da..d34d407 100644 --- a/crates/items_2/src/framable.rs +++ b/crates/items_2/src/framable.rs @@ -4,7 +4,6 @@ use crate::frame::make_log_frame; use crate::frame::make_range_complete_frame; use crate::frame::make_stats_frame; use bytes::BytesMut; -use err::Error; use items_0::framable::FrameTypeInnerDyn; use items_0::framable::FrameTypeInnerStatic; use items_0::streamitem::LogItem; @@ -26,6 +25,27 @@ pub const INMEM_FRAME_HEAD: usize = 20; pub const INMEM_FRAME_FOOT: usize = 4; pub const INMEM_FRAME_MAGIC: u32 = 0xc6c3b73d; +#[derive(Debug, thiserror::Error)] +#[cstm(name = "ItemFramable")] +pub enum Error { + Msg(String), + DummyError, + Frame(#[from] crate::frame::Error), +} + +struct ErrMsg(E) +where + E: ToString; + +impl From> for Error +where + E: ToString, +{ + fn from(value: ErrMsg) -> Self { + Self::Msg(value.0.to_string()) + } +} + pub trait FrameTypeStatic { const FRAME_TYPE_ID: u32; } @@ -78,14 +98,16 @@ where match self { Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) => { let frame_type_id = k.frame_type_id(); - make_frame_2(self, frame_type_id) + make_frame_2(self, frame_type_id).map_err(Error::from) } - Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => make_range_complete_frame(), - Ok(StreamItem::Log(item)) => make_log_frame(item), - Ok(StreamItem::Stats(item)) => make_stats_frame(item), + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => { + make_range_complete_frame().map_err(Error::from) + } + Ok(StreamItem::Log(item)) => make_log_frame(item).map_err(Error::from), + Ok(StreamItem::Stats(item)) => make_stats_frame(item).map_err(Error::from), Err(e) => { info!("calling make_error_frame for [[{e}]]"); - make_error_frame(e) + make_error_frame(e).map_err(Error::from) } } } @@ -186,7 +208,7 @@ fn test_frame_log() { fn test_frame_error() { use crate::channelevents::ChannelEvents; use crate::frame::json_from_slice; - let item: Sitemty = Err(Error::with_msg_no_trace(format!("dummy-error-message"))); + let item: Sitemty = items_0::streamitem::sitem_err_from_string("dummyerror"); let buf = Framable::make_frame_dyn(&item).unwrap(); let len = u32::from_le_bytes(buf[12..16].try_into().unwrap()); let tyid = u32::from_le_bytes(buf[8..12].try_into().unwrap()); @@ -194,5 +216,5 @@ fn test_frame_error() { panic!("bad tyid"); } eprintln!("buf len {} len {}", buf.len(), len); - let item2: Error = json_from_slice(&buf[20..20 + len as usize]).unwrap(); + let item2: items_0::streamitem::SitemErrTy = json_from_slice(&buf[20..20 + len as usize]).unwrap(); } diff --git a/crates/items_2/src/frame.rs b/crates/items_2/src/frame.rs index 1ca32fa..407dc7f 100644 --- a/crates/items_2/src/frame.rs +++ b/crates/items_2/src/frame.rs @@ -13,7 +13,6 @@ use bincode::config::WithOtherTrailing; use bincode::DefaultOptions; use bytes::BufMut; use bytes::BytesMut; -use err::Error; use items_0::bincode; use items_0::streamitem::LogItem; use items_0::streamitem::StatsItem; @@ -27,6 +26,37 @@ use serde::Serialize; use std::any; use std::io; +#[derive(Debug, thiserror::Error)] +#[cstm(name = "ItemFrame")] +pub enum Error { + TooLongPayload(usize), + UnknownEncoder(u32), + #[error("BufferMismatch({0}, {1}, {2})")] + BufferMismatch(u32, usize, u32), + #[error("TyIdMismatch({0}, {1})")] + TyIdMismatch(u32, u32), + Msg(String), + Bincode(#[from] Box), + RmpEnc(#[from] rmp_serde::encode::Error), + RmpDec(#[from] rmp_serde::decode::Error), + ErasedSerde(#[from] erased_serde::Error), + Postcard(#[from] postcard::Error), + SerdeJson(#[from] serde_json::Error), +} + +struct ErrMsg(E) +where + E: ToString; + +impl From> for Error +where + E: ToString, +{ + fn from(value: ErrMsg) -> Self { + Self::Msg(value.0.to_string()) + } +} + pub fn bincode_ser( w: W, ) -> bincode::Serializer< @@ -54,7 +84,7 @@ where { let mut out = Vec::new(); let mut ser = bincode_ser(&mut out); - item.serialize(&mut ser).map_err(|e| format!("{e}"))?; + item.serialize(&mut ser)?; Ok(out) } @@ -68,14 +98,14 @@ where .with_fixint_encoding() .reject_trailing_bytes(); let mut de = bincode::Deserializer::from_slice(buf, opts); - ::deserialize(&mut de).map_err(|e| format!("{e}").into()) + ::deserialize(&mut de).map_err(Into::into) } fn msgpack_to_vec(item: T) -> Result, Error> where T: Serialize, { - rmp_serde::to_vec_named(&item).map_err(|e| format!("{e}").into()) + rmp_serde::to_vec_named(&item).map_err(Error::from) } fn msgpack_erased_to_vec(item: T) -> Result, Error> @@ -86,8 +116,7 @@ where { let mut ser1 = rmp_serde::Serializer::new(&mut out).with_struct_map(); let mut ser2 = ::erase(&mut ser1); - item.erased_serialize(&mut ser2) - .map_err(|e| Error::from(format!("{e}")))?; + item.erased_serialize(&mut ser2)?; } Ok(out) } @@ -96,14 +125,14 @@ fn msgpack_from_slice(buf: &[u8]) -> Result where T: for<'de> serde::Deserialize<'de>, { - rmp_serde::from_slice(buf).map_err(|e| format!("{e}").into()) + rmp_serde::from_slice(buf).map_err(Error::from) } fn postcard_to_vec(item: T) -> Result, Error> where T: Serialize, { - postcard::to_stdvec(&item).map_err(|e| format!("{e}").into()) + postcard::to_stdvec(&item).map_err(Error::from) } fn postcard_erased_to_vec(item: T) -> Result, Error> @@ -117,31 +146,30 @@ where { let mut ser2 = ::erase(&mut ser1); item.erased_serialize(&mut ser2) - } - .map_err(|e| Error::from(format!("{e}")))?; - let ret = ser1.output.finalize().map_err(|e| format!("{e}").into()); - ret + }?; + let ret = ser1.output.finalize()?; + Ok(ret) } pub fn postcard_from_slice(buf: &[u8]) -> Result where T: for<'de> serde::Deserialize<'de>, { - postcard::from_bytes(buf).map_err(|e| format!("{e}").into()) + Ok(postcard::from_bytes(buf)?) } fn json_to_vec(item: T) -> Result, Error> where T: Serialize, { - serde_json::to_vec(&item).map_err(Error::from_string) + Ok(serde_json::to_vec(&item)?) } pub fn json_from_slice(buf: &[u8]) -> Result where T: for<'de> serde::Deserialize<'de>, { - serde_json::from_slice(buf).map_err(Error::from_string) + Ok(serde_json::from_slice(buf)?) } pub fn encode_to_vec(item: T) -> Result, Error> @@ -187,7 +215,7 @@ where { let enc = encode_erased_to_vec(item)?; if enc.len() > u32::MAX as usize { - return Err(Error::with_msg(format!("too long payload {}", enc.len()))); + return Err(Error::TooLongPayload(enc.len())); } let mut h = crc32fast::Hasher::new(); h.update(&enc); @@ -323,15 +351,10 @@ where T: FrameDecodable, { if frame.encid() != INMEM_FRAME_ENCID { - return Err(Error::with_msg(format!("unknown encoder id {:?}", frame))); + return Err(Error::UnknownEncoder(frame.encid())); } if frame.len() as usize != frame.buf().len() { - return Err(Error::with_msg(format!( - "buf mismatch {} vs {} in {:?}", - frame.len(), - frame.buf().len(), - frame - ))); + return Err(Error::BufferMismatch(frame.len(), frame.buf().len(), frame.tyid())); } if frame.tyid() == ERROR_FRAME_TYPE_ID { // error frames are always encoded as json @@ -376,12 +399,7 @@ where } else { let tyid = T::FRAME_TYPE_ID; if frame.tyid() != tyid { - Err(Error::with_msg(format!( - "type id mismatch expect {:04x} found {:04x} {:?}", - tyid, - frame.tyid(), - frame - ))) + Err(Error::TyIdMismatch(tyid, frame.tyid())) } else { match decode_from_slice(frame.buf()) { Ok(item) => Ok(item), diff --git a/crates/nodenet/src/client.rs b/crates/nodenet/src/client.rs index 450e718..cd72e0c 100644 --- a/crates/nodenet/src/client.rs +++ b/crates/nodenet/src/client.rs @@ -1,5 +1,6 @@ use err::Error; use futures_util::Future; +use futures_util::TryFutureExt; use http::header; use http::Method; use http::Request; @@ -8,6 +9,7 @@ use httpclient::http; use httpclient::hyper::StatusCode; use httpclient::hyper::Uri; use items_0::streamitem::sitem_data; +use items_0::streamitem::sitem_err2_from_string; use items_2::framable::Framable; use netpod::log::*; use netpod::Cluster; @@ -24,11 +26,11 @@ async fn open_bytes_data_streams_http( ctx: ReqCtx, cluster: Cluster, ) -> Result, Error> { - let frame1 = make_node_command_frame(subq.clone())?; + let frame1 = make_node_command_frame(subq.clone()).map_err(|e| Error::from_string(e))?; let mut streams = Vec::new(); for node in &cluster.nodes { let item = sitem_data(frame1.clone()); - let buf = item.make_frame_dyn()?; + let buf = item.make_frame_dyn().map_err(|e| Error::from_string(e))?; let url = node.baseurl().join("/api/4/private/eventdata/frames").unwrap(); debug!("open_event_data_streams_http post {url}"); @@ -84,8 +86,9 @@ impl OpenBoxedBytesStreams for OpenBoxedBytesViaHttp { &self, subq: EventsSubQuery, ctx: ReqCtx, - ) -> Pin, Error>> + Send>> { + ) -> Pin, streams::tcprawclient::Error>> + Send>> { let fut = open_bytes_data_streams_http(subq, ctx, self.cluster.clone()); + let fut = fut.map_err(|e| streams::tcprawclient::Error::from(e.to_string())); Box::pin(fut) } } diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index 25821e2..93af134 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -4,7 +4,9 @@ use err::thiserror; use err::ThisError; use futures_util::Stream; use futures_util::StreamExt; +use futures_util::TryStreamExt; use items_0::on_sitemty_data; +use items_0::streamitem::sitem_err2_from_string; use items_0::streamitem::LogItem; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; @@ -49,6 +51,11 @@ pub enum Error { Items(#[from] items_2::Error), NotAvailable, DebugTest, + Generator(#[from] streams::generators::Error), + Transform(#[from] streams::transform::Error), + Framable(#[from] items_2::framable::Error), + Frame(#[from] items_2::frame::Error), + InMem(#[from] streams::frames::inmem::Error), } pub async fn events_service(ncc: NodeConfigCached) -> Result<(), Error> { @@ -138,8 +145,7 @@ pub async fn create_response_bytes_stream( // TODO support event blobs as transform let fetch_info = evq.ch_conf().to_sf_databuffer()?; let stream = disk::raw::conn::make_event_blobs_pipe(&evq, &fetch_info, reqctx, ncc)?; - // let stream = stream.map(|x| Box::new(x) as _); - let stream = stream.map(|x| x.make_frame_dyn().map(|x| x.freeze())); + let stream = stream.map(|x| x.make_frame_dyn().map(|x| x.freeze()).map_err(sitem_err2_from_string)); let ret = Box::pin(stream); Ok(ret) } else { @@ -161,7 +167,11 @@ pub async fn create_response_bytes_stream( }) }); // let stream = stream.map(move |x| Box::new(x) as Box); - let stream = stream.map(|x| x.make_frame_dyn().map(bytes::BytesMut::freeze)); + let stream = stream.map(|x| { + x.make_frame_dyn() + .map(bytes::BytesMut::freeze) + .map_err(sitem_err2_from_string) + }); let ret = Box::pin(stream); Ok(ret) } @@ -324,13 +334,13 @@ async fn events_conn_handler_inner( ncc: &NodeConfigCached, ) -> Result<(), Error> where - INP: Stream> + Unpin, + INP: Stream> + Unpin, { match events_conn_handler_inner_try(netin, netout, addr, scyqueue, ncc).await { Ok(_) => (), Err(ce) => { let mut out = ce.netout; - let item: Sitemty = Err(err::Error::from_string(ce.err)); + let item: Sitemty = Err(items_0::streamitem::SitemErrTy::from_string(ce.err)); let buf = Framable::make_frame_dyn(&item)?; out.write_all(&buf).await?; } @@ -345,7 +355,9 @@ async fn events_conn_handler( ncc: NodeConfigCached, ) -> Result<(), Error> { let (netin, netout) = stream.into_split(); - let inp = Box::new(TcpReadAsBytes::new(netin)); + let inp = TcpReadAsBytes::new(netin); + let inp = inp.map_err(sitem_err2_from_string); + let inp = Box::new(inp); let span1 = span!(Level::INFO, "events_conn_handler"); let r = events_conn_handler_inner(inp, netout, addr, scyqueue, &ncc) .instrument(span1) diff --git a/crates/streams/src/cbor_stream.rs b/crates/streams/src/cbor_stream.rs index f6ec503..678e942 100644 --- a/crates/streams/src/cbor_stream.rs +++ b/crates/streams/src/cbor_stream.rs @@ -18,7 +18,6 @@ use netpod::log::Level; use netpod::log::*; use netpod::ScalarType; use netpod::Shape; -use std::fmt; use std::io::Cursor; use std::pin::Pin; use std::task::Context; diff --git a/crates/streams/src/frames/eventsfromframes.rs b/crates/streams/src/frames/eventsfromframes.rs index 0f48d1c..d87e189 100644 --- a/crates/streams/src/frames/eventsfromframes.rs +++ b/crates/streams/src/frames/eventsfromframes.rs @@ -3,6 +3,7 @@ use futures_util::StreamExt; use items_0::framable::FrameTypeInnerStatic; use items_0::streamitem::sitem_err_from_string; use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::SitemErrTy; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_2::frame::decode_frame; @@ -19,7 +20,7 @@ use std::task::Poll; pub enum Error {} pub struct EventsFromFrames { - inp: Pin, Error>> + Send>>, + inp: Pin, SitemErrTy>> + Send>>, dbgdesc: String, errored: bool, completed: bool, @@ -28,7 +29,7 @@ pub struct EventsFromFrames { impl EventsFromFrames { pub fn new( - inp: Pin, Error>> + Send>>, + inp: Pin, SitemErrTy>> + Send>>, dbgdesc: String, ) -> Self { Self { @@ -98,7 +99,7 @@ where e ); self.errored = true; - Ready(Some(Err(e))) + Ready(Some(sitem_err_from_string(e))) } }, }, diff --git a/crates/streams/src/generators.rs b/crates/streams/src/generators.rs index c2a2405..dcdc5ff 100644 --- a/crates/streams/src/generators.rs +++ b/crates/streams/src/generators.rs @@ -10,7 +10,6 @@ use items_0::on_sitemty_data; use items_0::streamitem::sitem_data; use items_0::streamitem::sitem_err2_from_string; use items_0::streamitem::RangeCompletableItem; -use items_0::streamitem::SitemErrTy; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::Appendable; @@ -37,6 +36,8 @@ use std::time::Duration; pub enum Error { UnsupportedIsEventBlobs, Transform(#[from] crate::transform::Error), + Items2(#[from] items_2::Error), + BadChannelName, } pub fn make_test_channel_events_bytes_stream( @@ -68,7 +69,7 @@ pub fn make_test_channel_events_bytes_stream( }); let stream = stream .map_err(sitem_err2_from_string) - .map(|x| x.make_frame_dyn().map(|x| x.freeze())); + .map(|x| x.make_frame_dyn().map(|x| x.freeze()).map_err(sitem_err2_from_string)); let ret = Box::pin(stream); Ok(ret) } @@ -106,14 +107,10 @@ fn make_test_channel_events_stream_data_inner( } else { let na: Vec<_> = chn.split("-").collect(); if na.len() != 3 { - Err(Error::with_msg_no_trace(format!( - "make_channel_events_stream_data can not understand test channel name: {chn:?}" - ))) + Err(Error::BadChannelName) } else { if na[0] != "inmem" { - Err(Error::with_msg_no_trace(format!( - "make_channel_events_stream_data can not understand test channel name: {chn:?}" - ))) + Err(Error::BadChannelName) } else { let _range = subq.range().clone(); if na[1] == "d0" { @@ -124,14 +121,10 @@ fn make_test_channel_events_stream_data_inner( //generator::generate_f32(node_ix, node_count, range) panic!() } else { - Err(Error::with_msg_no_trace(format!( - "make_channel_events_stream_data can not understand test channel name: {chn:?}" - ))) + Err(Error::BadChannelName) } } else { - Err(Error::with_msg_no_trace(format!( - "make_channel_events_stream_data can not understand test channel name: {chn:?}" - ))) + Err(Error::BadChannelName) } } } diff --git a/crates/streams/src/itemclone.rs b/crates/streams/src/itemclone.rs index c95d762..db5ceff 100644 --- a/crates/streams/src/itemclone.rs +++ b/crates/streams/src/itemclone.rs @@ -65,7 +65,7 @@ where pin_mut!(fut); match fut.poll(cx) { Ready(Ok(())) => Ready(Ok(())), - Ready(Err(e)) => Ready(Err(e.into())), + Ready(Err(_)) => todo!("can not send copy"), Pending => Pending, } } diff --git a/crates/streams/src/json_stream.rs b/crates/streams/src/json_stream.rs index 3e80955..c2dc3e1 100644 --- a/crates/streams/src/json_stream.rs +++ b/crates/streams/src/json_stream.rs @@ -2,6 +2,7 @@ use crate::cbor_stream::SitemtyDynEventsStream; use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::Events; use items_0::WithLen; @@ -11,7 +12,23 @@ use std::time::Duration; #[derive(Debug, thiserror::Error)] #[cstm(name = "JsonStream")] -pub enum Error {} +pub enum Error { + Json(#[from] serde_json::Error), + Msg(String), +} + +pub struct ErrMsg(pub E) +where + E: ToString; + +impl From> for Error +where + E: ToString, +{ + fn from(value: ErrMsg) -> Self { + Self::Msg(value.0.to_string()) + } +} pub struct JsonBytes(String); @@ -56,7 +73,7 @@ pub fn events_stream_to_json_stream(stream: SitemtyDynEventsStream) -> impl Stre prepend.chain(stream) } -fn map_events(x: Result>>, Error>) -> Result { +fn map_events(x: Sitemty>) -> Result { match x { Ok(x) => match x { StreamItem::DataItem(x) => match x { @@ -104,12 +121,12 @@ fn map_events(x: Result>>, Error } }, StreamItem::Log(item) => { - info!("{item:?}"); + debug!("{item:?}"); let item = JsonBytes::new(String::new()); Ok(item) } StreamItem::Stats(item) => { - info!("{item:?}"); + debug!("{item:?}"); let item = JsonBytes::new(String::new()); Ok(item) } diff --git a/crates/streams/src/needminbuffer.rs b/crates/streams/src/needminbuffer.rs index adac34e..37689b6 100644 --- a/crates/streams/src/needminbuffer.rs +++ b/crates/streams/src/needminbuffer.rs @@ -12,7 +12,7 @@ use std::task::Poll; pub enum Error {} pub struct NeedMinBuffer { - inp: Pin> + Send>>, + inp: Pin> + Send>>, need_min: u32, left: Option, buf_len_histo: HistoLog2, @@ -21,9 +21,11 @@ pub struct NeedMinBuffer { } impl NeedMinBuffer { - pub fn new(inp: Pin> + Send>>) -> Self { + pub fn new( + inp: Pin> + Send>>, + ) -> Self { Self { - inp: inp, + inp, need_min: 1, left: None, buf_len_histo: HistoLog2::new(8), @@ -50,7 +52,7 @@ impl Drop for NeedMinBuffer { } impl Stream for NeedMinBuffer { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; diff --git a/crates/streams/src/plaineventsjson.rs b/crates/streams/src/plaineventsjson.rs index 3427278..8a986de 100644 --- a/crates/streams/src/plaineventsjson.rs +++ b/crates/streams/src/plaineventsjson.rs @@ -24,6 +24,7 @@ use std::time::Instant; pub enum Error { Stream(#[from] crate::plaineventsstream::Error), Json(#[from] serde_json::Error), + Collect(#[from] crate::collect::Error), } pub async fn plain_events_json( diff --git a/crates/streams/src/plaineventsstream.rs b/crates/streams/src/plaineventsstream.rs index 40d6cbd..c4e3a04 100644 --- a/crates/streams/src/plaineventsstream.rs +++ b/crates/streams/src/plaineventsstream.rs @@ -20,6 +20,7 @@ use std::pin::Pin; pub enum Error { Netpod(#[from] netpod::NetpodError), Transform(#[from] crate::transform::Error), + TcpRawClient(#[from] crate::tcprawclient::Error), } pub type DynEventsStream = Pin>> + Send>>; @@ -77,7 +78,7 @@ pub async fn dyn_events_stream( }); if let Some(wasmname) = evq.test_do_wasm() { - let stream = transform_wasm(stream, wasmname, ctx).await?; + let stream = transform_wasm::<_, items_0::streamitem::SitemErrTy>(stream, wasmname, ctx).await?; Ok(Box::pin(stream)) } else { Ok(Box::pin(stream)) diff --git a/crates/streams/src/rangefilter2.rs b/crates/streams/src/rangefilter2.rs index 50f16cd..a8325ec 100644 --- a/crates/streams/src/rangefilter2.rs +++ b/crates/streams/src/rangefilter2.rs @@ -1,6 +1,6 @@ -use err::Error; use futures_util::Stream; use futures_util::StreamExt; +use items_0::streamitem::sitem_err_from_string; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StatsItem; @@ -22,13 +22,12 @@ use items_0::Events; #[cfg(test)] use std::collections::VecDeque; -#[allow(unused)] -macro_rules! trace_emit { - ($det:expr, $($arg:tt)*) => { - if $det { - eprintln!($($arg)*); - } - }; +macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if false && $det { trace!($($arg)*); } ) } + +#[derive(Debug, thiserror::Error)] +#[cstm(name = "Rangefilter")] +pub enum Error { + Merge(#[from] MergeError), } pub struct RangeFilter2 @@ -146,8 +145,7 @@ where } else { trace_emit!(self.trdet, "discarding events len {:?}", lige - 1); let mut dummy = item.new_empty(); - item.drain_into(&mut dummy, (0, lige - 1)) - .map_err(|e| format!("{e} unexpected MergeError while remove of items"))?; + item.drain_into(&mut dummy, (0, lige - 1))?; self.slot1 = None; item } @@ -157,8 +155,7 @@ where trace_emit!(self.trdet, "drain into to keep one before"); let n = item.len(); let mut keep = item.new_empty(); - item.drain_into(&mut keep, (n.max(1) - 1, n)) - .map_err(|e| format!("{e} unexpected MergeError while remove of items"))?; + item.drain_into(&mut keep, (n.max(1) - 1, n))?; self.slot1 = Some(keep); item.new_empty() } @@ -192,10 +189,7 @@ where loop { break if self.complete { error!("{} poll_next on complete", Self::type_name()); - Ready(Some(Err(Error::with_msg_no_trace(format!( - "{} poll_next on complete", - Self::type_name() - ))))) + Ready(Some(sitem_err_from_string("poll next on complete"))) } else if self.done { self.complete = true; Ready(None) @@ -223,7 +217,7 @@ where Err(e) => { error!("sees: {e}"); self.inp_done = true; - Ready(Some(Err(e))) + Ready(Some(sitem_err_from_string(e))) } }, Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => { diff --git a/crates/streams/src/tcprawclient.rs b/crates/streams/src/tcprawclient.rs index b365c50..a0396b2 100644 --- a/crates/streams/src/tcprawclient.rs +++ b/crates/streams/src/tcprawclient.rs @@ -7,14 +7,15 @@ use crate::frames::eventsfromframes::EventsFromFrames; use crate::frames::inmem::BoxedBytesStream; use crate::frames::inmem::InMemoryFrameStream; use crate::frames::inmem::TcpReadAsBytes; -use err::Error; use futures_util::Future; use futures_util::Stream; +use futures_util::TryStreamExt; use http::Uri; use httpclient::body_bytes; use httpclient::http; use items_0::framable::FrameTypeInnerStatic; use items_0::streamitem::sitem_data; +use items_0::streamitem::sitem_err2_from_string; use items_0::streamitem::Sitemty; use items_2::eventfull::EventFull; use items_2::framable::EventQueryJsonStringFrame; @@ -42,6 +43,40 @@ use tokio::net::TcpStream; pub const TEST_BACKEND: &str = "testbackend-00"; +#[derive(Debug, thiserror::Error)] +#[cstm(name = "TcpRawClient")] +pub enum Error { + IO(#[from] std::io::Error), + Msg(String), + Frame(#[from] items_2::frame::Error), + Framable(#[from] items_2::framable::Error), + Json(#[from] serde_json::Error), + Http(#[from] http::Error), + HttpClient(#[from] httpclient::Error), + Hyper(#[from] httpclient::hyper::Error), + #[error("ServerError({0:?}, {1})")] + ServerError(http::response::Parts, String), +} + +struct ErrMsg(E) +where + E: ToString; + +impl From> for Error +where + E: ToString, +{ + fn from(value: ErrMsg) -> Self { + Self::Msg(value.0.to_string()) + } +} + +impl From for Error { + fn from(value: String) -> Self { + Self::Msg(value) + } +} + pub trait OpenBoxedBytesStreams { fn open( &self, @@ -75,8 +110,10 @@ pub async fn x_processed_event_blobs_stream_from_node_tcp( netout.write_all(&buf).await?; netout.flush().await?; netout.forget(); - let inp = Box::pin(TcpReadAsBytes::new(netin)) as BoxedBytesStream; + let inp = TcpReadAsBytes::new(netin).map_err(sitem_err2_from_string); + let inp = Box::pin(inp) as BoxedBytesStream; let frames = InMemoryFrameStream::new(inp, subq.inmem_bufcap()); + let frames = frames.map_err(sitem_err2_from_string); let frames = Box::pin(frames); let items = EventsFromFrames::new(frames, addr); Ok(Box::pin(items)) @@ -106,31 +143,20 @@ pub async fn x_processed_event_blobs_stream_from_node_http( .header(header::HOST, uri.host().unwrap()) .header(header::ACCEPT, APP_OCTET) .header(ctx.header_name(), ctx.header_value()) - .body(body_bytes(buf)) - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + .body(body_bytes(buf))?; let mut client = httpclient::connect_client(req.uri()).await?; - let res = client - .send_request(req) - .await - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let res = client.send_request(req).await?; if res.status() != StatusCode::OK { error!("Server error {:?}", res); let (head, body) = res.into_parts(); let buf = httpclient::read_body_bytes(body).await?; let s = String::from_utf8_lossy(&buf); - return Err(Error::with_msg(format!( - concat!( - "Server error {:?}\n", - "---------------------- message from http body:\n", - "{}\n", - "---------------------- end of http body", - ), - head, s - ))); + return Err(Error::ServerError(head, s.to_string())); } let (_head, body) = res.into_parts(); let inp = Box::pin(httpclient::IncomingStream::new(body)) as BoxedBytesStream; let frames = InMemoryFrameStream::new(inp, subq.inmem_bufcap()); + let frames = frames.map_err(sitem_err2_from_string); let frames = Box::pin(frames); let stream = EventsFromFrames::new(frames, url.to_string()); debug!("open_event_data_streams_http done {url}"); @@ -175,8 +201,11 @@ where netout.flush().await?; netout.forget(); // TODO for images, we need larger buffer capacity - let inp = Box::pin(TcpReadAsBytes::new(netin)) as BoxedBytesStream; + let inp = TcpReadAsBytes::new(netin); + let inp = inp.map_err(sitem_err2_from_string); + let inp = Box::pin(inp) as BoxedBytesStream; let frames = InMemoryFrameStream::new(inp, subq.inmem_bufcap()); + let frames = frames.map_err(sitem_err2_from_string); let frames = Box::pin(frames); let stream = EventsFromFrames::::new(frames, addr); streams.push(Box::pin(stream) as _); @@ -193,6 +222,7 @@ where T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static, { let frames = InMemoryFrameStream::new(inp, bufcap); + let frames = frames.map_err(sitem_err2_from_string); // TODO let EventsFromFrames accept also non-boxed input? let frames = Box::pin(frames); let stream = EventsFromFrames::::new(frames, dbgdesc); diff --git a/crates/streams/src/test.rs b/crates/streams/src/test.rs index 38283e1..c7244f9 100644 --- a/crates/streams/src/test.rs +++ b/crates/streams/src/test.rs @@ -5,7 +5,6 @@ mod events; #[cfg(test)] mod timebin; -use err::Error; use futures_util::stream; use futures_util::Stream; use items_0::streamitem::sitem_data; @@ -17,6 +16,10 @@ use items_2::eventsdim0::EventsDim0; use netpod::timeunits::SEC; use std::pin::Pin; +#[derive(Debug, thiserror::Error)] +#[cstm(name = "StreamsTest")] +pub enum Error {} + type BoxedEventStream = Pin> + Send>>; // TODO use some xorshift generator. @@ -51,11 +54,10 @@ fn merge_mergeable_00() -> Result<(), Error> { runfut(fut) } -fn runfut(fut: F) -> Result +fn runfut(fut: F) -> Result where - F: std::future::Future>, + F: std::future::Future>, + E: std::error::Error, { - use futures_util::TryFutureExt; - let fut = fut.map_err(|e| e.into()); taskrun::run(fut) } diff --git a/crates/streams/src/test/events.rs b/crates/streams/src/test/events.rs index 5e2a422..c3d1948 100644 --- a/crates/streams/src/test/events.rs +++ b/crates/streams/src/test/events.rs @@ -8,6 +8,7 @@ use crate::tcprawclient::TEST_BACKEND; use futures_util::future; use futures_util::Future; use futures_util::StreamExt; +use futures_util::TryFutureExt; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::range::evrange::SeriesRange; @@ -83,8 +84,8 @@ impl OpenBoxedBytesStreams for StreamOpener { &self, subq: EventsSubQuery, _ctx: ReqCtx, - ) -> Pin, Error>> + Send>> { - Box::pin(stream_opener(subq)) + ) -> Pin, crate::tcprawclient::Error>> + Send>> { + Box::pin(stream_opener(subq).map_err(|e| crate::tcprawclient::Error::Msg(format!("{e}")))) } } diff --git a/crates/streams/src/teststream.rs b/crates/streams/src/teststream.rs index d1c85f6..3460a9c 100644 --- a/crates/streams/src/teststream.rs +++ b/crates/streams/src/teststream.rs @@ -1,21 +1,20 @@ use crate::timebin::cached::reader::EventsReadProvider; use crate::timebin::cached::reader::EventsReading; use futures_util::Stream; +use items_0::streamitem::sitem_err2_from_string; use items_0::streamitem::Sitemty; use items_2::channelevents::ChannelEvents; use netpod::range::evrange::SeriesRange; use query::api4::events::EventsSubQuery; -use rand_xoshiro::rand_core::SeedableRng; -use rand_xoshiro::Xoshiro128PlusPlus; use std::pin::Pin; fn make_stream(chname: &str, range: &SeriesRange) -> Pin> + Send>> { if chname == "unittest;scylla;cont;scalar;f32" { - let e = ::err::Error::with_msg_no_trace("unknown channel {chname}"); + let e = sitem_err2_from_string(format!("unknown channel {chname}")); let ret = futures_util::stream::iter([Err(e)]); Box::pin(ret) } else { - let e = ::err::Error::with_msg_no_trace("unknown channel {chname}"); + let e = sitem_err2_from_string(format!("unknown channel {chname}")); let ret = futures_util::stream::iter([Err(e)]); Box::pin(ret) } diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index 98ada0e..f066f52 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -14,6 +14,7 @@ use futures_util::Stream; use futures_util::StreamExt; use items_0::collect_s::CollectableDyn; use items_0::on_sitemty_data; +use items_0::streamitem::sitem_err2_from_string; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; @@ -41,6 +42,23 @@ pub enum Error { Query(#[from] query::api4::binned::Error), FromLayers(#[from] super::timebin::fromlayers::Error), Transform(#[from] super::transform::Error), + TcpRawClient(#[from] crate::tcprawclient::Error), + Collect(#[from] crate::collect::Error), + Json(#[from] serde_json::Error), + Msg(String), +} + +struct ErrMsg(E) +where + E: ToString; + +impl From> for Error +where + E: ToString, +{ + fn from(value: ErrMsg) -> Self { + Self::Msg(value.0.to_string()) + } } #[allow(unused)] @@ -435,7 +453,7 @@ pub async fn timebinned_json_framed( // TODO skip the intermediate conversion to js value, go directly to string data let stream = stream.map(|x| match x { Ok(x) => Ok(JsonBytes::new(serde_json::to_string(&x).unwrap())), - Err(e) => Err(e), + Err(e) => Err(crate::json_stream::Error::from(crate::json_stream::ErrMsg(e))), }); Ok(Box::pin(stream)) } diff --git a/crates/streams/src/transform.rs b/crates/streams/src/transform.rs index ff86ad7..0d5bf3d 100644 --- a/crates/streams/src/transform.rs +++ b/crates/streams/src/transform.rs @@ -20,23 +20,20 @@ use std::pin::Pin; #[derive(Debug, thiserror::Error)] #[cstm(name = "Transform")] -pub enum Error {} +pub enum Error { + #[error("UnhandledQuery({0:?})")] + UnhandledQuery(EventTransformQuery), +} pub fn build_event_transform(tr: &TransformQuery) -> Result { let trev = tr.get_tr_event(); match trev { EventTransformQuery::ValueFull => Ok(make_transform_identity()), EventTransformQuery::MinMaxAvgDev => Ok(make_transform_min_max_avg()), - EventTransformQuery::ArrayPick(..) => Err(Error::with_msg_no_trace(format!( - "build_event_transform don't know what to do {trev:?}" - ))), + EventTransformQuery::ArrayPick(..) => Err(Error::UnhandledQuery(trev.clone())), EventTransformQuery::PulseIdDiff => Ok(make_transform_pulse_id_diff()), - EventTransformQuery::EventBlobsVerbatim => Err(Error::with_msg_no_trace(format!( - "build_event_transform don't know what to do {trev:?}" - ))), - EventTransformQuery::EventBlobsUncompressed => Err(Error::with_msg_no_trace(format!( - "build_event_transform don't know what to do {trev:?}" - ))), + EventTransformQuery::EventBlobsVerbatim => Err(Error::UnhandledQuery(trev.clone())), + EventTransformQuery::EventBlobsUncompressed => Err(Error::UnhandledQuery(trev.clone())), } }