Files
daqbuf-channeltools/src/channel_combine_ab.rs
2025-02-17 13:32:20 +01:00

79 lines
1.7 KiB
Rust

use async_channel::Receiver;
use async_channel::Sender;
use futures_util::Stream;
use futures_util::StreamExt;
use pin_project_lite::pin_project;
use std::pin::pin;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
autoerr::create_error_v1!(
name(Error, "ChannelCombine"),
enum variants {
Logic,
},
);
pin_project! {
pub struct ChannelCombineAB<T> {
#[pin]
a: Receiver<T>,
#[pin]
b: Receiver<T>,
#[pin]
tx: Sender<T>,
}
}
impl<T> ChannelCombineAB<T>
where
T: Send + Unpin + 'static,
{
pub fn new(a: Receiver<T>, b: Receiver<T>) -> Receiver<T> {
let capdef = 10;
let cap = a
.capacity()
.unwrap_or(capdef)
.max(b.capacity().unwrap_or(capdef));
let (tx, rx) = async_channel::bounded(cap);
tokio::spawn(Self::run(Self::inner_new(a, b, tx)));
rx
}
fn inner_new(a: Receiver<T>, b: Receiver<T>, tx: Sender<T>) -> Self {
Self { a, b, tx }
}
async fn run(obj: Self) {
let mut o2 = pin!(obj);
while let Some(x) = o2.next().await {
match o2.tx.send(x).await {
Ok(()) => {}
Err(_) => {
break;
}
}
}
}
}
impl<T> Stream for ChannelCombineAB<T>
where
T: 'static,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let mut this = self.project();
match this.a.poll_next_unpin(cx) {
Ready(x) => Ready(x),
Pending => match this.b.poll_next_unpin(cx) {
Ready(x) => Ready(x),
Pending => Pending,
},
}
}
}