commit 0b39cc93b529d14a391278b63bef7cf5c7be8938 Author: Dominik Werder Date: Mon Feb 17 13:32:20 2025 +0100 Channel combine adapter diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1b72444 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/Cargo.lock +/target diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..87c53ad --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "daqbuf-channeltools" +version = "0.0.1" +authors = ["Dominik Werder "] +edition = "2021" + +[lib] +doctest = false + +[dependencies] +futures-util = "0.3.31" +async-channel = "2.3.1" +autoerr = "0.0.3" +pin-project-lite = "0.2.16" +tokio = { version = "1", default-features = false } diff --git a/src/channel_combine_ab.rs b/src/channel_combine_ab.rs new file mode 100644 index 0000000..9066a8c --- /dev/null +++ b/src/channel_combine_ab.rs @@ -0,0 +1,78 @@ +use async_channel::Receiver; +use async_channel::Sender; +use futures_util::Stream; +use futures_util::StreamExt; +use pin_project_lite::pin_project; +use std::pin::pin; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +autoerr::create_error_v1!( + name(Error, "ChannelCombine"), + enum variants { + Logic, + }, +); + +pin_project! { + pub struct ChannelCombineAB { + #[pin] + a: Receiver, + #[pin] + b: Receiver, + #[pin] + tx: Sender, + } +} + +impl ChannelCombineAB +where + T: Send + Unpin + 'static, +{ + pub fn new(a: Receiver, b: Receiver) -> Receiver { + let capdef = 10; + let cap = a + .capacity() + .unwrap_or(capdef) + .max(b.capacity().unwrap_or(capdef)); + let (tx, rx) = async_channel::bounded(cap); + tokio::spawn(Self::run(Self::inner_new(a, b, tx))); + rx + } + + fn inner_new(a: Receiver, b: Receiver, tx: Sender) -> Self { + Self { a, b, tx } + } + + async fn run(obj: Self) { + let mut o2 = pin!(obj); + while let Some(x) = o2.next().await { + match o2.tx.send(x).await { + Ok(()) => {} + Err(_) => { + break; + } + } + } + } +} + +impl Stream for ChannelCombineAB +where + T: 'static, +{ + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + let mut this = self.project(); + match this.a.poll_next_unpin(cx) { + Ready(x) => Ready(x), + Pending => match this.b.poll_next_unpin(cx) { + Ready(x) => Ready(x), + Pending => Pending, + }, + } + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..bebba7b --- /dev/null +++ b/src/lib.rs @@ -0,0 +1 @@ +pub mod channel_combine_ab;