use async_channel::Receiver; use async_channel::Sender; use futures_util::Stream; use futures_util::StreamExt; use pin_project_lite::pin_project; use std::pin::pin; use std::pin::Pin; use std::task::Context; use std::task::Poll; autoerr::create_error_v1!( name(Error, "ChannelCombine"), enum variants { Logic, }, ); pin_project! { pub struct ChannelCombineAB { #[pin] a: Receiver, #[pin] b: Receiver, #[pin] tx: Sender, } } impl ChannelCombineAB where T: Send + Unpin + 'static, { pub fn new(a: Receiver, b: Receiver) -> Receiver { let capdef = 10; let cap = a .capacity() .unwrap_or(capdef) .max(b.capacity().unwrap_or(capdef)); let (tx, rx) = async_channel::bounded(cap); tokio::spawn(Self::run(Self::inner_new(a, b, tx))); rx } fn inner_new(a: Receiver, b: Receiver, tx: Sender) -> Self { Self { a, b, tx } } async fn run(obj: Self) { let mut o2 = pin!(obj); while let Some(x) = o2.next().await { match o2.tx.send(x).await { Ok(()) => {} Err(_) => { break; } } } } } impl Stream for ChannelCombineAB where T: 'static, { type Item = T; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; let mut this = self.project(); match this.a.poll_next_unpin(cx) { Ready(x) => Ready(x), Pending => match this.b.poll_next_unpin(cx) { Ready(x) => Ready(x), Pending => Pending, }, } } }