Upgrade dependency
This commit is contained in:
@@ -5,7 +5,7 @@ authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
futures-util = "0.3.15"
|
||||
futures-util = "0.3.31"
|
||||
pin-project = "1.0.12"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
@@ -14,9 +14,9 @@ typetag = "0.2.18"
|
||||
ciborium = "0.2.1"
|
||||
bytes = "1.8"
|
||||
arrayref = "0.3.6"
|
||||
crc32fast = "1.3.2"
|
||||
byteorder = "1.4.3"
|
||||
async-channel = "1.9.0"
|
||||
crc32fast = "1.4.2"
|
||||
byteorder = "1.5.0"
|
||||
async-channel = "2.3.1"
|
||||
rand_xoshiro = "0.6.0"
|
||||
http = "1"
|
||||
http-body = "1"
|
||||
|
||||
@@ -1,11 +1,8 @@
|
||||
use async_channel::Send;
|
||||
use async_channel::Sender;
|
||||
use futures_util::pin_mut;
|
||||
use futures_util::Future;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use std::pin::Pin;
|
||||
use std::ptr::NonNull;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
@@ -13,13 +10,16 @@ use std::task::Poll;
|
||||
#[cstm(name = "ItemClone")]
|
||||
pub enum Error {}
|
||||
|
||||
#[pin_project::pin_project]
|
||||
pub struct Itemclone<'a, T, INP>
|
||||
where
|
||||
T: 'static,
|
||||
{
|
||||
#[pin]
|
||||
sender: Pin<Box<Sender<T>>>,
|
||||
#[pin]
|
||||
inp: INP,
|
||||
send_fut: Option<Send<'a, T>>,
|
||||
send_fut: Option<Pin<Box<Send<'a, T>>>>,
|
||||
}
|
||||
|
||||
impl<'a, T, INP> Itemclone<'a, T, INP> {
|
||||
@@ -37,19 +37,24 @@ impl<'a, T, INP> Itemclone<'a, T, INP> {
|
||||
}
|
||||
}
|
||||
|
||||
unsafe fn extend<'a, 'b, T>(t: &'a mut T) -> &'b mut T {
|
||||
core::mem::transmute(t)
|
||||
}
|
||||
|
||||
impl<'a, T, INP> Itemclone<'a, T, INP>
|
||||
where
|
||||
INP: Stream<Item = T> + Unpin,
|
||||
T: Clone + Unpin,
|
||||
{
|
||||
fn poll_fresh(&mut self, cx: &mut Context) -> Poll<Option<Result<T, Error>>> {
|
||||
fn poll_fresh(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T, Error>>> {
|
||||
use Poll::*;
|
||||
match self.inp.poll_next_unpin(cx) {
|
||||
let selfproj = self.as_mut().project();
|
||||
match selfproj.inp.poll_next(cx) {
|
||||
Ready(Some(item)) => {
|
||||
let sender = self.sender.as_mut().get_mut();
|
||||
let mut ptr1 = NonNull::from(sender);
|
||||
let sender = unsafe { ptr1.as_mut() };
|
||||
self.send_fut = Some(sender.send(item.clone()));
|
||||
let r1 = selfproj.sender.get_mut().as_mut().get_mut();
|
||||
let sender = unsafe { extend(r1) };
|
||||
let fut = sender.send(item.clone());
|
||||
*selfproj.send_fut = Some(Box::pin(fut));
|
||||
Ready(Some(Ok(item)))
|
||||
}
|
||||
Ready(None) => {
|
||||
@@ -60,9 +65,8 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn send_copy(fut: &mut Send<T>, cx: &mut Context) -> Poll<Result<(), Error>> {
|
||||
fn send_copy(fut: Pin<&mut Send<T>>, cx: &mut Context) -> Poll<Result<(), Error>> {
|
||||
use Poll::*;
|
||||
pin_mut!(fut);
|
||||
match fut.poll(cx) {
|
||||
Ready(Ok(())) => Ready(Ok(())),
|
||||
Ready(Err(_)) => todo!("can not send copy"),
|
||||
@@ -80,12 +84,16 @@ where
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
match self.send_fut.as_mut() {
|
||||
Some(fut) => match Self::send_copy(fut, cx) {
|
||||
Ready(Ok(())) => self.poll_fresh(cx),
|
||||
Ready(Err(e)) => Ready(Some(Err(e))),
|
||||
Pending => Pending,
|
||||
},
|
||||
let selfproj = self.as_mut().project();
|
||||
match selfproj.send_fut {
|
||||
Some(fut) => {
|
||||
let fut = fut.as_mut();
|
||||
match Self::send_copy(fut, cx) {
|
||||
Ready(Ok(())) => self.poll_fresh(cx),
|
||||
Ready(Err(e)) => Ready(Some(Err(e))),
|
||||
Pending => Pending,
|
||||
}
|
||||
}
|
||||
None => self.poll_fresh(cx),
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user