diff --git a/Cargo.toml b/Cargo.toml index 725225f..c13314a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Dominik Werder "] edition = "2021" [dependencies] -futures-util = "0.3.15" +futures-util = "0.3.31" pin-project = "1.0.12" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" @@ -14,9 +14,9 @@ typetag = "0.2.18" ciborium = "0.2.1" bytes = "1.8" arrayref = "0.3.6" -crc32fast = "1.3.2" -byteorder = "1.4.3" -async-channel = "1.9.0" +crc32fast = "1.4.2" +byteorder = "1.5.0" +async-channel = "2.3.1" rand_xoshiro = "0.6.0" http = "1" http-body = "1" diff --git a/src/itemclone.rs b/src/itemclone.rs index db5ceff..1b3055c 100644 --- a/src/itemclone.rs +++ b/src/itemclone.rs @@ -1,11 +1,8 @@ use async_channel::Send; use async_channel::Sender; -use futures_util::pin_mut; use futures_util::Future; use futures_util::Stream; -use futures_util::StreamExt; use std::pin::Pin; -use std::ptr::NonNull; use std::task::Context; use std::task::Poll; @@ -13,13 +10,16 @@ use std::task::Poll; #[cstm(name = "ItemClone")] pub enum Error {} +#[pin_project::pin_project] pub struct Itemclone<'a, T, INP> where T: 'static, { + #[pin] sender: Pin>>, + #[pin] inp: INP, - send_fut: Option>, + send_fut: Option>>>, } impl<'a, T, INP> Itemclone<'a, T, INP> { @@ -37,19 +37,24 @@ impl<'a, T, INP> Itemclone<'a, T, INP> { } } +unsafe fn extend<'a, 'b, T>(t: &'a mut T) -> &'b mut T { + core::mem::transmute(t) +} + impl<'a, T, INP> Itemclone<'a, T, INP> where INP: Stream + Unpin, T: Clone + Unpin, { - fn poll_fresh(&mut self, cx: &mut Context) -> Poll>> { + fn poll_fresh(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { use Poll::*; - match self.inp.poll_next_unpin(cx) { + let selfproj = self.as_mut().project(); + match selfproj.inp.poll_next(cx) { Ready(Some(item)) => { - let sender = self.sender.as_mut().get_mut(); - let mut ptr1 = NonNull::from(sender); - let sender = unsafe { ptr1.as_mut() }; - self.send_fut = Some(sender.send(item.clone())); + let r1 = selfproj.sender.get_mut().as_mut().get_mut(); + let sender = unsafe { extend(r1) }; + let fut = sender.send(item.clone()); + *selfproj.send_fut = Some(Box::pin(fut)); Ready(Some(Ok(item))) } Ready(None) => { @@ -60,9 +65,8 @@ where } } - fn send_copy(fut: &mut Send, cx: &mut Context) -> Poll> { + fn send_copy(fut: Pin<&mut Send>, cx: &mut Context) -> Poll> { use Poll::*; - pin_mut!(fut); match fut.poll(cx) { Ready(Ok(())) => Ready(Ok(())), Ready(Err(_)) => todo!("can not send copy"), @@ -80,12 +84,16 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - match self.send_fut.as_mut() { - Some(fut) => match Self::send_copy(fut, cx) { - Ready(Ok(())) => self.poll_fresh(cx), - Ready(Err(e)) => Ready(Some(Err(e))), - Pending => Pending, - }, + let selfproj = self.as_mut().project(); + match selfproj.send_fut { + Some(fut) => { + let fut = fut.as_mut(); + match Self::send_copy(fut, cx) { + Ready(Ok(())) => self.poll_fresh(cx), + Ready(Err(e)) => Ready(Some(Err(e))), + Pending => Pending, + } + } None => self.poll_fresh(cx), } }