This commit is contained in:
Dominik Werder
2023-05-09 11:31:22 +02:00
parent 755105b50e
commit ab3fa849cd

View File

@@ -6,26 +6,26 @@ use futures_util::Future;
use futures_util::Stream; use futures_util::Stream;
use futures_util::StreamExt; use futures_util::StreamExt;
use std::pin::Pin; use std::pin::Pin;
use std::ptr::NonNull;
use std::task::Context; use std::task::Context;
use std::task::Poll; use std::task::Poll;
#[pin_project::pin_project] pub struct Itemclone<'a, T, INP>
pub struct Itemclone<T, INP>
where where
T: 'static, T: 'static,
{ {
#[pin] sender: Pin<Box<Sender<T>>>,
sender: Sender<T>,
inp: INP, inp: INP,
send_fut: Option<Send<'static, T>>, send_fut: Option<Send<'a, T>>,
} }
impl<T, INP> Itemclone<T, INP> { impl<'a, T, INP> Itemclone<'a, T, INP> {
pub fn new(inp: INP, sender: Sender<T>) -> Self pub fn new(inp: INP, sender: Sender<T>) -> Self
where where
INP: Stream<Item = T> + Unpin, INP: Stream<Item = T> + Unpin,
T: Clone + Unpin, T: Clone + Unpin,
{ {
let sender = Box::pin(sender);
Self { Self {
sender, sender,
inp, inp,
@@ -34,7 +34,7 @@ impl<T, INP> Itemclone<T, INP> {
} }
} }
impl<T, INP> Itemclone<T, INP> impl<'a, T, INP> Itemclone<'a, T, INP>
where where
INP: Stream<Item = T> + Unpin, INP: Stream<Item = T> + Unpin,
T: Clone + Unpin, T: Clone + Unpin,
@@ -43,7 +43,9 @@ where
use Poll::*; use Poll::*;
match self.inp.poll_next_unpin(cx) { match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => { Ready(Some(item)) => {
let sender = unsafe { &mut *((&mut self.sender) as *mut Sender<T>) }; let sender = self.sender.as_mut().get_mut();
let mut ptr1 = NonNull::from(sender);
let sender = unsafe { ptr1.as_mut() };
self.send_fut = Some(sender.send(item.clone())); self.send_fut = Some(sender.send(item.clone()));
Ready(Some(Ok(item))) Ready(Some(Ok(item)))
} }
@@ -66,7 +68,7 @@ where
} }
} }
impl<T, INP> Stream for Itemclone<T, INP> impl<'a, T, INP> Stream for Itemclone<'a, T, INP>
where where
INP: Stream<Item = T> + Unpin, INP: Stream<Item = T> + Unpin,
T: Clone + Unpin, T: Clone + Unpin,
@@ -75,7 +77,6 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*; use Poll::*;
let proj = self.as_mut().project();
match self.send_fut.as_mut() { match self.send_fut.as_mut() {
Some(fut) => match Self::send_copy(fut, cx) { Some(fut) => match Self::send_copy(fut, cx) {
Ready(Ok(())) => self.poll_fresh(cx), Ready(Ok(())) => self.poll_fresh(cx),