Channel combine adapter
This commit is contained in:
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
/Cargo.lock
|
||||
/target
|
||||
15
Cargo.toml
Normal file
15
Cargo.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
[package]
|
||||
name = "daqbuf-channeltools"
|
||||
version = "0.0.1"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2021"
|
||||
|
||||
[lib]
|
||||
doctest = false
|
||||
|
||||
[dependencies]
|
||||
futures-util = "0.3.31"
|
||||
async-channel = "2.3.1"
|
||||
autoerr = "0.0.3"
|
||||
pin-project-lite = "0.2.16"
|
||||
tokio = { version = "1", default-features = false }
|
||||
78
src/channel_combine_ab.rs
Normal file
78
src/channel_combine_ab.rs
Normal file
@@ -0,0 +1,78 @@
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use pin_project_lite::pin_project;
|
||||
use std::pin::pin;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
autoerr::create_error_v1!(
|
||||
name(Error, "ChannelCombine"),
|
||||
enum variants {
|
||||
Logic,
|
||||
},
|
||||
);
|
||||
|
||||
pin_project! {
|
||||
pub struct ChannelCombineAB<T> {
|
||||
#[pin]
|
||||
a: Receiver<T>,
|
||||
#[pin]
|
||||
b: Receiver<T>,
|
||||
#[pin]
|
||||
tx: Sender<T>,
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> ChannelCombineAB<T>
|
||||
where
|
||||
T: Send + Unpin + 'static,
|
||||
{
|
||||
pub fn new(a: Receiver<T>, b: Receiver<T>) -> Receiver<T> {
|
||||
let capdef = 10;
|
||||
let cap = a
|
||||
.capacity()
|
||||
.unwrap_or(capdef)
|
||||
.max(b.capacity().unwrap_or(capdef));
|
||||
let (tx, rx) = async_channel::bounded(cap);
|
||||
tokio::spawn(Self::run(Self::inner_new(a, b, tx)));
|
||||
rx
|
||||
}
|
||||
|
||||
fn inner_new(a: Receiver<T>, b: Receiver<T>, tx: Sender<T>) -> Self {
|
||||
Self { a, b, tx }
|
||||
}
|
||||
|
||||
async fn run(obj: Self) {
|
||||
let mut o2 = pin!(obj);
|
||||
while let Some(x) = o2.next().await {
|
||||
match o2.tx.send(x).await {
|
||||
Ok(()) => {}
|
||||
Err(_) => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Stream for ChannelCombineAB<T>
|
||||
where
|
||||
T: 'static,
|
||||
{
|
||||
type Item = T;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
let mut this = self.project();
|
||||
match this.a.poll_next_unpin(cx) {
|
||||
Ready(x) => Ready(x),
|
||||
Pending => match this.b.poll_next_unpin(cx) {
|
||||
Ready(x) => Ready(x),
|
||||
Pending => Pending,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
1
src/lib.rs
Normal file
1
src/lib.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub mod channel_combine_ab;
|
||||
Reference in New Issue
Block a user