Clean unused

This commit is contained in:
Dominik Werder
2024-12-04 17:10:35 +01:00
parent 855f1796f1
commit b548869490
8 changed files with 3 additions and 103 deletions

View File

@@ -1,59 +0,0 @@
use futures_util::stream::StreamExt;
use futures_util::Stream;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::transform::TransformProperties;
use items_0::transform::WithTransformProperties;
use items_0::Events;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
pub struct IntoBoxedEventStream<INP, T>
where
T: Events,
INP: Stream<Item = Sitemty<T>> + WithTransformProperties,
{
//inp: Pin<Box<dyn Stream<Item = Sitemty<T>>>>,
inp: Pin<Box<INP>>,
}
impl<INP, T> Stream for IntoBoxedEventStream<INP, T>
where
T: Events,
INP: Stream<Item = Sitemty<T>> + WithTransformProperties,
{
type Item = Sitemty<Box<dyn Events>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => Ready(Some(match item {
Ok(item) => Ok(match item {
StreamItem::DataItem(item) => StreamItem::DataItem(match item {
RangeCompletableItem::RangeComplete => RangeCompletableItem::RangeComplete,
RangeCompletableItem::Data(item) => {
RangeCompletableItem::Data(Box::new(item))
}
}),
StreamItem::Log(item) => StreamItem::Log(item),
StreamItem::Stats(item) => StreamItem::Stats(item),
}),
Err(e) => Err(e),
})),
Ready(None) => Ready(None),
Pending => Pending,
}
}
}
impl<INP, T> WithTransformProperties for IntoBoxedEventStream<INP, T>
where
T: Events,
INP: Stream<Item = Sitemty<T>> + WithTransformProperties,
{
fn query_transform_properties(&self) -> TransformProperties {
self.inp.query_transform_properties()
}
}

View File

@@ -14,7 +14,6 @@ use items_0::streamitem::LogItem;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::Events;
use items_2::channelevents::ChannelEvents;
use items_2::jsonbytes::CborBytes;
use netpod::log::Level;
@@ -203,7 +202,7 @@ impl<S> FramedBytesToChannelEventsStream<S> {
if let Some(y) = x.1.as_bool() {
if y {
Some(StreamItem::DataItem(
RangeCompletableItem::<Box<dyn Events>>::RangeComplete,
RangeCompletableItem::<ChannelEvents>::RangeComplete,
))
} else {
None

View File

@@ -30,7 +30,6 @@ use std::task::Poll;
#[cstm(name = "Generator")]
pub enum Error {
UnsupportedIsEventBlobs,
Transform(#[from] crate::transform::Error),
Items2(#[from] items_2::Error),
BadChannelName,
}

View File

@@ -1,4 +1,3 @@
pub mod boxed;
pub mod cbor_stream;
pub mod collect;
#[cfg(feature = "indev")]
@@ -32,7 +31,6 @@ pub mod test;
pub mod teststream;
pub mod timebin;
pub mod timebinnedjson;
pub mod transform;
#[allow(unused)]
fn todoval<T>() -> T {

View File

@@ -16,7 +16,6 @@ use std::pin::Pin;
#[cstm(name = "PlainEventsStream")]
pub enum Error {
Netpod(#[from] netpod::NetpodError),
Transform(#[from] crate::transform::Error),
TcpRawClient(#[from] crate::tcprawclient::Error),
}

View File

@@ -5,7 +5,6 @@ use futures_util::StreamExt;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::Events;
use items_2::binning::container_events::ContainerEvents;
use items_2::channelevents::ChannelEvents;
use netpod::range::evrange::NanoRange;

View File

@@ -18,7 +18,6 @@ use items_0::on_sitemty_data;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::Events;
use items_2::channelevents::ChannelEvents;
use items_2::jsonbytes::JsonBytes;
use items_2::merger::Merger;
@@ -31,7 +30,6 @@ use netpod::ReqCtx;
use query::api4::binned::BinnedQuery;
use query::api4::events::EventsSubQuerySettings;
use query::transform::TransformQuery;
use serde_json::Value as JsonValue;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
@@ -42,7 +40,6 @@ use std::time::Instant;
pub enum Error {
Query(#[from] query::api4::binned::Error),
FromLayers(#[from] super::timebin::fromlayers::Error),
Transform(#[from] super::transform::Error),
TcpRawClient(#[from] crate::tcprawclient::Error),
Collect(#[from] crate::collect::Error),
Json(#[from] serde_json::Error),
@@ -151,7 +148,7 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents(
let buffer_ptr = buffer_ptr[0].i32().unwrap();
let stream = stream.map(move |x| {
let memory = memory.clone();
let item = on_sitemty_data!(x, |mut evs: Box<dyn Events>| {
let item = on_sitemty_data!(x, |mut evs: Box<dyn TodoUseType>| {
let x = {
use items_0::AsAnyMut;
if true {
@@ -251,7 +248,7 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents(
// Box::new(item) as Box<dyn Framable + Send>
item
});
Box::pin(stream) as Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Events>>> + Send>>
Box::pin(stream) as Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TodoUseType>>> + Send>>
} else {
let stream = stream.map(|x| x);
Box::pin(stream)

View File

@@ -1,32 +0,0 @@
use items_0::transform::EventStreamTrait;
use items_0::transform::TransformProperties;
use items_0::transform::WithTransformProperties;
use query::transform::EventTransformQuery;
use std::pin::Pin;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "Transform")]
pub enum Error {
#[error("UnhandledQuery({0:?})")]
UnhandledQuery(EventTransformQuery),
}
// TODO remove, in its current usage it reboxes
pub struct EventsToTimeBinnable {
inp: Pin<Box<dyn EventStreamTrait>>,
}
impl EventsToTimeBinnable {
pub fn new<INP>(inp: INP) -> Self
where
INP: EventStreamTrait + 'static,
{
Self { inp: Box::pin(inp) }
}
}
impl WithTransformProperties for EventsToTimeBinnable {
fn query_transform_properties(&self) -> TransformProperties {
self.inp.query_transform_properties()
}
}