From b5488694904e050d99378576a953172229a5d9b3 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 4 Dec 2024 17:10:35 +0100 Subject: [PATCH] Clean unused --- src/boxed.rs | 59 ---------------------------------------- src/cbor_stream.rs | 3 +- src/generators.rs | 1 - src/lib.rs | 2 -- src/plaineventsstream.rs | 1 - src/rangefilter2/test.rs | 1 - src/timebinnedjson.rs | 7 ++--- src/transform.rs | 32 ---------------------- 8 files changed, 3 insertions(+), 103 deletions(-) delete mode 100644 src/boxed.rs delete mode 100644 src/transform.rs diff --git a/src/boxed.rs b/src/boxed.rs deleted file mode 100644 index 5ddc5ab..0000000 --- a/src/boxed.rs +++ /dev/null @@ -1,59 +0,0 @@ -use futures_util::stream::StreamExt; -use futures_util::Stream; -use items_0::streamitem::RangeCompletableItem; -use items_0::streamitem::Sitemty; -use items_0::streamitem::StreamItem; -use items_0::transform::TransformProperties; -use items_0::transform::WithTransformProperties; -use items_0::Events; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; - -pub struct IntoBoxedEventStream -where - T: Events, - INP: Stream> + WithTransformProperties, -{ - //inp: Pin>>>, - inp: Pin>, -} - -impl Stream for IntoBoxedEventStream -where - T: Events, - INP: Stream> + WithTransformProperties, -{ - type Item = Sitemty>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - match self.inp.poll_next_unpin(cx) { - Ready(Some(item)) => Ready(Some(match item { - Ok(item) => Ok(match item { - StreamItem::DataItem(item) => StreamItem::DataItem(match item { - RangeCompletableItem::RangeComplete => RangeCompletableItem::RangeComplete, - RangeCompletableItem::Data(item) => { - RangeCompletableItem::Data(Box::new(item)) - } - }), - StreamItem::Log(item) => StreamItem::Log(item), - StreamItem::Stats(item) => StreamItem::Stats(item), - }), - Err(e) => Err(e), - })), - Ready(None) => Ready(None), - Pending => Pending, - } - } -} - -impl WithTransformProperties for IntoBoxedEventStream -where - T: Events, - INP: Stream> + WithTransformProperties, -{ - fn query_transform_properties(&self) -> TransformProperties { - self.inp.query_transform_properties() - } -} diff --git a/src/cbor_stream.rs b/src/cbor_stream.rs index 0b500cf..e0f8fc2 100644 --- a/src/cbor_stream.rs +++ b/src/cbor_stream.rs @@ -14,7 +14,6 @@ use items_0::streamitem::LogItem; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; -use items_0::Events; use items_2::channelevents::ChannelEvents; use items_2::jsonbytes::CborBytes; use netpod::log::Level; @@ -203,7 +202,7 @@ impl FramedBytesToChannelEventsStream { if let Some(y) = x.1.as_bool() { if y { Some(StreamItem::DataItem( - RangeCompletableItem::>::RangeComplete, + RangeCompletableItem::::RangeComplete, )) } else { None diff --git a/src/generators.rs b/src/generators.rs index d3386ec..1c1ee15 100644 --- a/src/generators.rs +++ b/src/generators.rs @@ -30,7 +30,6 @@ use std::task::Poll; #[cstm(name = "Generator")] pub enum Error { UnsupportedIsEventBlobs, - Transform(#[from] crate::transform::Error), Items2(#[from] items_2::Error), BadChannelName, } diff --git a/src/lib.rs b/src/lib.rs index 8770c14..6e35e58 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,3 @@ -pub mod boxed; pub mod cbor_stream; pub mod collect; #[cfg(feature = "indev")] @@ -32,7 +31,6 @@ pub mod test; pub mod teststream; pub mod timebin; pub mod timebinnedjson; -pub mod transform; #[allow(unused)] fn todoval() -> T { diff --git a/src/plaineventsstream.rs b/src/plaineventsstream.rs index 0306bce..ce30244 100644 --- a/src/plaineventsstream.rs +++ b/src/plaineventsstream.rs @@ -16,7 +16,6 @@ use std::pin::Pin; #[cstm(name = "PlainEventsStream")] pub enum Error { Netpod(#[from] netpod::NetpodError), - Transform(#[from] crate::transform::Error), TcpRawClient(#[from] crate::tcprawclient::Error), } diff --git a/src/rangefilter2/test.rs b/src/rangefilter2/test.rs index b8193a7..b5c3062 100644 --- a/src/rangefilter2/test.rs +++ b/src/rangefilter2/test.rs @@ -5,7 +5,6 @@ use futures_util::StreamExt; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; -use items_0::Events; use items_2::binning::container_events::ContainerEvents; use items_2::channelevents::ChannelEvents; use netpod::range::evrange::NanoRange; diff --git a/src/timebinnedjson.rs b/src/timebinnedjson.rs index 4d96e7f..b2901f5 100644 --- a/src/timebinnedjson.rs +++ b/src/timebinnedjson.rs @@ -18,7 +18,6 @@ use items_0::on_sitemty_data; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; -use items_0::Events; use items_2::channelevents::ChannelEvents; use items_2::jsonbytes::JsonBytes; use items_2::merger::Merger; @@ -31,7 +30,6 @@ use netpod::ReqCtx; use query::api4::binned::BinnedQuery; use query::api4::events::EventsSubQuerySettings; use query::transform::TransformQuery; -use serde_json::Value as JsonValue; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -42,7 +40,6 @@ use std::time::Instant; pub enum Error { Query(#[from] query::api4::binned::Error), FromLayers(#[from] super::timebin::fromlayers::Error), - Transform(#[from] super::transform::Error), TcpRawClient(#[from] crate::tcprawclient::Error), Collect(#[from] crate::collect::Error), Json(#[from] serde_json::Error), @@ -151,7 +148,7 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents( let buffer_ptr = buffer_ptr[0].i32().unwrap(); let stream = stream.map(move |x| { let memory = memory.clone(); - let item = on_sitemty_data!(x, |mut evs: Box| { + let item = on_sitemty_data!(x, |mut evs: Box| { let x = { use items_0::AsAnyMut; if true { @@ -251,7 +248,7 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents( // Box::new(item) as Box item }); - Box::pin(stream) as Pin>> + Send>> + Box::pin(stream) as Pin>> + Send>> } else { let stream = stream.map(|x| x); Box::pin(stream) diff --git a/src/transform.rs b/src/transform.rs deleted file mode 100644 index 0618e00..0000000 --- a/src/transform.rs +++ /dev/null @@ -1,32 +0,0 @@ -use items_0::transform::EventStreamTrait; -use items_0::transform::TransformProperties; -use items_0::transform::WithTransformProperties; -use query::transform::EventTransformQuery; -use std::pin::Pin; - -#[derive(Debug, thiserror::Error)] -#[cstm(name = "Transform")] -pub enum Error { - #[error("UnhandledQuery({0:?})")] - UnhandledQuery(EventTransformQuery), -} - -// TODO remove, in its current usage it reboxes -pub struct EventsToTimeBinnable { - inp: Pin>, -} - -impl EventsToTimeBinnable { - pub fn new(inp: INP) -> Self - where - INP: EventStreamTrait + 'static, - { - Self { inp: Box::pin(inp) } - } -} - -impl WithTransformProperties for EventsToTimeBinnable { - fn query_transform_properties(&self) -> TransformProperties { - self.inp.query_transform_properties() - } -}