From ab3fa849cdd9cf4cbbc2d0a2f9d6b9a985e23d60 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 9 May 2023 11:31:22 +0200 Subject: [PATCH] Simplify --- streams/src/itemclone.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/streams/src/itemclone.rs b/streams/src/itemclone.rs index 64dc070..13fb8a9 100644 --- a/streams/src/itemclone.rs +++ b/streams/src/itemclone.rs @@ -6,26 +6,26 @@ use futures_util::Future; use futures_util::Stream; use futures_util::StreamExt; use std::pin::Pin; +use std::ptr::NonNull; use std::task::Context; use std::task::Poll; -#[pin_project::pin_project] -pub struct Itemclone +pub struct Itemclone<'a, T, INP> where T: 'static, { - #[pin] - sender: Sender, + sender: Pin>>, inp: INP, - send_fut: Option>, + send_fut: Option>, } -impl Itemclone { +impl<'a, T, INP> Itemclone<'a, T, INP> { pub fn new(inp: INP, sender: Sender) -> Self where INP: Stream + Unpin, T: Clone + Unpin, { + let sender = Box::pin(sender); Self { sender, inp, @@ -34,7 +34,7 @@ impl Itemclone { } } -impl Itemclone +impl<'a, T, INP> Itemclone<'a, T, INP> where INP: Stream + Unpin, T: Clone + Unpin, @@ -43,7 +43,9 @@ where use Poll::*; match self.inp.poll_next_unpin(cx) { Ready(Some(item)) => { - let sender = unsafe { &mut *((&mut self.sender) as *mut Sender) }; + let sender = self.sender.as_mut().get_mut(); + let mut ptr1 = NonNull::from(sender); + let sender = unsafe { ptr1.as_mut() }; self.send_fut = Some(sender.send(item.clone())); Ready(Some(Ok(item))) } @@ -66,7 +68,7 @@ where } } -impl Stream for Itemclone +impl<'a, T, INP> Stream for Itemclone<'a, T, INP> where INP: Stream + Unpin, T: Clone + Unpin, @@ -75,7 +77,6 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - let proj = self.as_mut().project(); match self.send_fut.as_mut() { Some(fut) => match Self::send_copy(fut, cx) { Ready(Ok(())) => self.poll_fresh(cx),