From fa86c7ab7d796d1c18c65586af7a0df5d124f1fd Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 8 Nov 2021 21:21:55 +0100 Subject: [PATCH] WIP on send channel --- archapp/src/archeng/configs.rs | 16 +- commonio/Cargo.toml | 2 +- netfetch/src/bsread.rs | 40 +-- netfetch/src/netbuf.rs | 117 ++++++++ netfetch/src/netfetch.rs | 1 + netfetch/src/zmtp.rs | 416 +++++++++++++++++++---------- taskrun/Cargo.toml | 5 +- taskrun/src/{lib.rs => taskrun.rs} | 0 8 files changed, 423 insertions(+), 174 deletions(-) create mode 100644 netfetch/src/netbuf.rs rename taskrun/src/{lib.rs => taskrun.rs} (100%) diff --git a/archapp/src/archeng/configs.rs b/archapp/src/archeng/configs.rs index 30d0e2f..62b97b9 100644 --- a/archapp/src/archeng/configs.rs +++ b/archapp/src/archeng/configs.rs @@ -14,7 +14,7 @@ use tokio_postgres::{Client, Row}; pub struct ChannelNameStream { db_config: Database, - off: u64, + max_name: String, db_done: bool, batch: VecDeque, connect_fut: Option> + Send>>>, @@ -27,7 +27,7 @@ impl ChannelNameStream { pub fn new(db_config: Database) -> Self { Self { db_config, - off: 0, + max_name: String::new(), db_done: false, batch: VecDeque::new(), connect_fut: None, @@ -55,10 +55,12 @@ impl Stream for ChannelNameStream { match fut.poll_unpin(cx) { Ready(Ok(rows)) => { self.select_fut = None; - self.off += rows.len() as u64; if rows.len() == 0 { self.db_done = true; } + if let Some(last) = rows.last().as_ref() { + self.max_name = last.get(1); + } for row in rows { self.batch.push_back(row.get(1)); } @@ -75,13 +77,13 @@ impl Stream for ChannelNameStream { match fut.poll_unpin(cx) { Ready(Ok(dbc)) => { self.connect_fut = None; - let off = self.off as i64; - info!("select channels off {}", off); + let max_name = self.max_name.clone(); + info!("select channels max_name {}", max_name); let fut = async move { let rows = dbc .query( - "select rowid, name from channels where config = '{}'::jsonb order by name offset $1 limit 64", - &[&off], + "select rowid, name from channels where config = '{}'::jsonb and name > $1 order by name limit 64", + &[&max_name], ) .await?; Ok::<_, Error>(rows) diff --git a/commonio/Cargo.toml b/commonio/Cargo.toml index 12402b7..08e597c 100644 --- a/commonio/Cargo.toml +++ b/commonio/Cargo.toml @@ -2,7 +2,7 @@ name = "commonio" version = "0.0.1-a.dev.4" authors = ["Dominik Werder "] -edition = "2018" +edition = "2021" [lib] path = "src/commonio.rs" diff --git a/netfetch/src/bsread.rs b/netfetch/src/bsread.rs index 59f5081..59acdea 100644 --- a/netfetch/src/bsread.rs +++ b/netfetch/src/bsread.rs @@ -5,7 +5,7 @@ use netpod::log::*; use netpod::ByteOrder; use netpod::ScalarType; use netpod::Shape; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use serde_json::Value as JsVal; use std::fmt; @@ -15,40 +15,40 @@ pub struct ParseError { pub msg: ZmtpMessage, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct GlobalTimestamp { - sec: u64, - ns: u64, + pub sec: u64, + pub ns: u64, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct ChannelDesc { - name: String, + pub name: String, #[serde(rename = "type")] - ty: String, - shape: JsVal, - encoding: String, + pub ty: String, + pub shape: JsVal, + pub encoding: String, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct HeadA { - htype: String, - hash: String, - pulse_id: serde_json::Number, - global_timestamp: GlobalTimestamp, + pub htype: String, + pub hash: String, + pub pulse_id: serde_json::Number, + pub global_timestamp: GlobalTimestamp, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct HeadB { - htype: String, - channels: Vec, + pub htype: String, + pub channels: Vec, } #[derive(Debug)] pub struct BsreadMessage { - head_a: HeadA, - head_b: HeadB, - values: Vec>, + pub head_a: HeadA, + pub head_b: HeadB, + pub values: Vec>, } pub fn parse_zmtp_message(msg: &ZmtpMessage) -> Result { diff --git a/netfetch/src/netbuf.rs b/netfetch/src/netbuf.rs new file mode 100644 index 0000000..2d5eea5 --- /dev/null +++ b/netfetch/src/netbuf.rs @@ -0,0 +1,117 @@ +use err::Error; +use tokio::io::ReadBuf; + +pub const BUFCAP: usize = 1024 * 128; +pub const RP_REW_PT: usize = 1024 * 64; + +pub struct NetBuf { + buf: Vec, + wp: usize, + rp: usize, +} + +impl NetBuf { + pub fn new() -> Self { + Self { + buf: vec![0; BUFCAP], + wp: 0, + rp: 0, + } + } + + pub fn len(&self) -> usize { + self.wp - self.rp + } + + pub fn cap(&self) -> usize { + self.buf.len() + } + + pub fn wrcap(&self) -> usize { + self.buf.len() - self.wp + } + + pub fn data(&self) -> &[u8] { + &self.buf[self.rp..self.wp] + } + + pub fn adv(&mut self, x: usize) -> Result<(), Error> { + if self.len() < x { + return Err(Error::with_msg_no_trace("not enough bytes")); + } else { + self.rp += x; + Ok(()) + } + } + + pub fn wpadv(&mut self, x: usize) -> Result<(), Error> { + if self.wrcap() < x { + return Err(Error::with_msg_no_trace("not enough space")); + } else { + self.wp += x; + Ok(()) + } + } + + pub fn read_u8(&mut self) -> Result { + type T = u8; + const TS: usize = std::mem::size_of::(); + if self.len() < TS { + return Err(Error::with_msg_no_trace("not enough bytes")); + } else { + let val = self.buf[self.rp]; + self.rp += TS; + Ok(val) + } + } + + pub fn read_u64(&mut self) -> Result { + type T = u64; + const TS: usize = std::mem::size_of::(); + if self.len() < TS { + return Err(Error::with_msg_no_trace("not enough bytes")); + } else { + let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); + self.rp += TS; + Ok(val) + } + } + + pub fn read_bytes(&mut self, n: usize) -> Result<&[u8], Error> { + if self.len() < n { + return Err(Error::with_msg_no_trace("not enough bytes")); + } else { + let val = self.buf[self.rp..self.rp + n].as_ref(); + self.rp += n; + Ok(val) + } + } + + pub fn read_buf_for_fill(&mut self) -> ReadBuf { + self.rewind_if_needed(); + let read_buf = ReadBuf::new(&mut self.buf[self.wp..]); + read_buf + } + + pub fn rewind_if_needed(&mut self) { + if self.rp != 0 && self.rp == self.wp { + self.rp = 0; + self.wp = 0; + } else if self.rp > RP_REW_PT { + self.buf.copy_within(self.rp..self.wp, 0); + self.wp -= self.rp; + self.rp = 0; + } + } + + pub fn put_slice(&mut self, buf: &[u8]) -> Result<(), Error> { + self.rewind_if_needed(); + if self.wrcap() < buf.len() { + return Err(Error::with_msg_no_trace("not enough space")); + } else { + self.buf[self.wp..self.wp + buf.len()].copy_from_slice(buf); + self.wp += buf.len(); + Ok(()) + } + } +} diff --git a/netfetch/src/netfetch.rs b/netfetch/src/netfetch.rs index 92a851a..983e103 100644 --- a/netfetch/src/netfetch.rs +++ b/netfetch/src/netfetch.rs @@ -1,5 +1,6 @@ pub mod bsread; pub mod ca; +pub mod netbuf; #[cfg(test)] pub mod test; pub mod zmtp; diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index 08e548d..c019d48 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -1,16 +1,25 @@ +use crate::bsread::parse_zmtp_message; +use crate::bsread::ChannelDesc; +use crate::bsread::GlobalTimestamp; +use crate::bsread::HeadA; +use crate::bsread::HeadB; +use crate::netbuf::NetBuf; +use crate::netbuf::RP_REW_PT; +use async_channel::Receiver; +use async_channel::Sender; use err::Error; use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; use netpod::log::*; +use serde_json::Value as JsVal; use std::fmt; use std::mem; use std::pin::Pin; use std::task::{Context, Poll}; +use std::time::Duration; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; -use crate::bsread::parse_zmtp_message; - #[test] fn test_listen() -> Result<(), Error> { use std::time::Duration; @@ -21,6 +30,29 @@ fn test_listen() -> Result<(), Error> { taskrun::run(fut) } +#[test] +fn test_service() -> Result<(), Error> { + //use std::time::Duration; + let fut = async move { + let sock = tokio::net::TcpListener::bind("0.0.0.0:9999").await?; + loop { + info!("accepting..."); + let (conn, remote) = sock.accept().await?; + info!("new connection from {:?}", remote); + let mut zmtp = Zmtp::new(conn); + let fut = async move { + while let Some(item) = zmtp.next().await { + info!("item from {:?} {:?}", remote, item); + } + Ok::<_, Error>(()) + }; + taskrun::spawn(fut); + } + //Ok::<_, Error>(()) + }; + taskrun::run(fut) +} + pub async fn zmtp_00() -> Result<(), Error> { let addr = "S10-CPPM-MOT0991:9999"; zmtp_client(addr).await?; @@ -67,9 +99,26 @@ enum ConnState { ReadFrameFlags, ReadFrameShort, ReadFrameLong, - ReadFrameBody, + ReadFrameBody(usize), } +impl ConnState { + fn need_min(&self) -> usize { + use ConnState::*; + match self { + InitSend => 0, + InitRecv1 => 11, + InitRecv2 => 53, + ReadFrameFlags => 1, + ReadFrameShort => 1, + ReadFrameLong => 8, + ReadFrameBody(msglen) => *msglen, + } + } +} + +struct DummyData {} + struct Zmtp { done: bool, complete: bool, @@ -77,11 +126,14 @@ struct Zmtp { conn_state: ConnState, buf: NetBuf, outbuf: NetBuf, - need_min: usize, + out_enable: bool, msglen: usize, has_more: bool, is_command: bool, frames: Vec, + inp_eof: bool, + data_tx: Sender, + data_rx: Receiver, } impl Zmtp { @@ -90,6 +142,7 @@ impl Zmtp { //conn.set_recv_buffer_size(1024 * 1024 * 4)?; //info!("send_buffer_size {:8}", conn.send_buffer_size()?); //info!("recv_buffer_size {:8}", conn.recv_buffer_size()?); + let (tx, rx) = async_channel::bounded(1); Self { done: false, complete: false, @@ -97,66 +150,72 @@ impl Zmtp { conn_state: ConnState::InitSend, buf: NetBuf::new(), outbuf: NetBuf::new(), - need_min: 0, + out_enable: false, msglen: 0, has_more: false, is_command: false, frames: vec![], + inp_eof: false, + data_tx: tx, + data_rx: rx, } } - fn buf_conn(&mut self) -> (&mut TcpStream, ReadBuf) { + fn inpbuf_conn(&mut self) -> (&mut TcpStream, ReadBuf) { (&mut self.conn, self.buf.read_buf_for_fill()) } fn outbuf_conn(&mut self) -> (&[u8], &mut TcpStream) { - let b = &self.outbuf.buf[self.outbuf.rp..self.outbuf.wp]; - let w = &mut self.conn; - (b, w) + (self.outbuf.data(), &mut self.conn) } fn parse_item(&mut self) -> Result, Error> { match self.conn_state { ConnState::InitSend => { info!("parse_item InitSend"); - // TODO allow to specify a minimum amount of needed space. - // TODO factor writing into the buffer in some way... - let mut b = self.outbuf.read_buf_for_fill(); - b.put_slice(&[0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 3]); - self.outbuf.wp += b.filled().len(); + self.outbuf.put_slice(&[0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 3])?; self.conn_state = ConnState::InitRecv1; - self.need_min = 11; Ok(None) } ConnState::InitRecv1 => { - let ver = self.buf.buf[self.buf.rp + 10]; - self.buf.rp += self.need_min; + self.buf.adv(10)?; + let ver = self.buf.read_u8()?; info!("parse_item InitRecv1 major version {}", ver); if ver != 3 { return Err(Error::with_msg_no_trace(format!("bad version {}", ver))); } - let mut b = self.outbuf.read_buf_for_fill(); - b.put_slice(&[0, 0x4e, 0x55, 0x4c, 0x4c]); + self.outbuf.put_slice(&[0, 0x4e, 0x55, 0x4c, 0x4c])?; let a = vec![0; 48]; - b.put_slice(&a); - self.outbuf.wp += b.filled().len(); + self.outbuf.put_slice(&a)?; self.conn_state = ConnState::InitRecv2; - self.need_min = 53; Ok(None) } ConnState::InitRecv2 => { info!("parse_item InitRecv2"); - self.buf.rp += self.need_min; - let mut b = self.outbuf.read_buf_for_fill(); - b.put_slice(&b"\x04\x1a\x05READY\x0bSocket-Type\x00\x00\x00\x04PULL"[..]); - self.outbuf.wp += b.filled().len(); + // TODO parse greeting remainder.. sec-scheme. + self.buf.adv(self.conn_state.need_min())?; + self.outbuf + .put_slice(&b"\x04\x1a\x05READY\x0bSocket-Type\x00\x00\x00\x04PULL"[..])?; + self.out_enable = true; self.conn_state = ConnState::ReadFrameFlags; - self.need_min = 1; + let tx = self.data_tx.clone(); + let fut1 = async move { + loop { + tokio::time::sleep(Duration::from_millis(1000)).await; + let dd = DummyData {}; + match tx.send(dd).await { + Ok(()) => { + info!("item send to channel"); + } + Err(_) => break, + } + } + }; + taskrun::spawn(fut1); Ok(None) } ConnState::ReadFrameFlags => { - let flags = self.buf.buf[self.buf.rp + 0]; - self.buf.rp += self.need_min; + let flags = self.buf.read_u8()?; let has_more = flags & 0x01 != 0; let long_size = flags & 0x02 != 0; let is_command = flags & 0x04 != 0; @@ -168,54 +227,52 @@ impl Zmtp { long_size, is_command ); + if is_command { + warn!("Got zmtp command frame"); + } if false && is_command { return Err(Error::with_msg_no_trace("got zmtp command frame")); } if long_size { self.conn_state = ConnState::ReadFrameLong; - self.need_min = 8; } else { self.conn_state = ConnState::ReadFrameShort; - self.need_min = 1; } Ok(None) } ConnState::ReadFrameShort => { - let len = self.buf.buf[self.buf.rp + 0]; - self.buf.rp += self.need_min; - self.msglen = len as usize; - trace!("parse_item ReadFrameShort self.msglen {}", self.msglen); - self.conn_state = ConnState::ReadFrameBody; - self.need_min = self.msglen; + self.msglen = self.buf.read_u8()? as usize; + trace!("parse_item ReadFrameShort msglen {}", self.msglen); + self.conn_state = ConnState::ReadFrameBody(self.msglen); + if self.msglen > 1024 * 64 { + return Err(Error::with_msg_no_trace(format!( + "larger msglen not yet supported {}", + self.msglen, + ))); + } Ok(None) } ConnState::ReadFrameLong => { - let mut a = [0; 8]; - for i1 in 0..8 { - a[i1] = self.buf.buf[self.buf.rp + i1]; + self.msglen = self.buf.read_u64()? as usize; + trace!("parse_item ReadFrameShort msglen {}", self.msglen); + self.conn_state = ConnState::ReadFrameBody(self.msglen); + if self.msglen > 1024 * 64 { + return Err(Error::with_msg_no_trace(format!( + "larger msglen not yet supported {}", + self.msglen, + ))); } - self.buf.rp += self.need_min; - self.msglen = usize::from_be_bytes(a); - trace!("parse_item ReadFrameLong self.msglen {}", self.msglen); - self.conn_state = ConnState::ReadFrameBody; - self.need_min = self.msglen; Ok(None) } - ConnState::ReadFrameBody => { - let n1 = self.buf.len(); - let n1 = if n1 < 256 { n1 } else { 256 }; - let data = self.buf.buf[self.buf.rp..(self.buf.rp + self.msglen)].to_vec(); + ConnState::ReadFrameBody(msglen) => { + let data = self.buf.read_bytes(msglen)?.to_vec(); + self.msglen = 0; if false { - let s = String::from_utf8_lossy(&self.buf.buf[self.buf.rp..(self.buf.rp + n1)]); - trace!( - "parse_item ReadFrameBody self.need_min {} string {}", - self.need_min, - s - ); + let n1 = data.len().min(256); + let s = String::from_utf8_lossy(&data[..n1]); + trace!("parse_item ReadFrameBody msglen {} string {}", msglen, s); } - self.buf.rp += self.need_min; self.conn_state = ConnState::ReadFrameFlags; - self.need_min = 1; if !self.is_command { let g = ZmtpFrame { msglen: self.msglen, @@ -238,43 +295,6 @@ impl Zmtp { } } -struct NetBuf { - buf: Vec, - wp: usize, - rp: usize, -} - -impl NetBuf { - fn new() -> Self { - Self { - buf: vec![0; 1024 * 128], - wp: 0, - rp: 0, - } - } - - fn len(&self) -> usize { - self.wp - self.rp - } - - fn read_buf_for_fill(&mut self) -> ReadBuf { - self.rewind_if_needed(); - let read_buf = ReadBuf::new(&mut self.buf[self.wp..]); - read_buf - } - - fn rewind_if_needed(&mut self) { - if self.rp != 0 && self.rp == self.wp { - self.rp = 0; - self.wp = 0; - } else { - self.buf.copy_within(self.rp..self.wp, 0); - self.wp -= self.rp; - self.rp = 0; - } - } -} - #[derive(Debug)] pub struct ZmtpMessage { frames: Vec, @@ -320,6 +340,24 @@ impl fmt::Debug for ZmtpFrame { } } +enum Int { + NoWork, + Pending, + Empty, + Item(T), + Done, +} + +impl Int { + fn is_item(&self) -> bool { + if let Int::Item(_) = self { + true + } else { + false + } + } +} + #[derive(Debug)] enum ZmtpEvent { ZmtpMessage(ZmtpMessage), @@ -336,75 +374,163 @@ impl Stream for Zmtp { self.complete = true; return Ready(None); } - 'outer: loop { - let write_pending = loop { - if self.outbuf.len() > 0 { - let (b, w) = self.outbuf_conn(); - pin_mut!(w); - match w.poll_write(cx, b) { - Ready(k) => match k { - Ok(k) => { - self.outbuf.rp += k; + loop { + let have_item = false; + let serialized: Int<()> = if self.out_enable && self.outbuf.wrcap() >= RP_REW_PT { + match self.data_rx.poll_next_unpin(cx) { + Ready(Some(k)) => { + info!("item from channel, put to output buffer"); + let head_a = HeadA { + htype: "bsr_m-1.1".into(), + // TODO hash definition? + hash: "b9e0916effc5a8a2f1977a9eb8beea63".into(), + pulse_id: serde_json::Number::from(42424242), + global_timestamp: GlobalTimestamp { + sec: 1636401670, + ns: 12920856, + }, + }; + let head_b = HeadB { + htype: "bsr_d-1.1".into(), + channels: vec![ChannelDesc { + name: "TESTCHAN".into(), + ty: "int64".into(), + shape: JsVal::Array(vec![JsVal::Number(serde_json::Number::from(1))]), + encoding: "little".into(), + }], + }; + let ha = serde_json::to_vec(&head_a).unwrap(); + let hb = serde_json::to_vec(&head_b).unwrap(); + let hf = 23478236u64.to_le_bytes(); + let hp = 13131313u64.to_le_bytes(); + self.outbuf.put_slice(&ha).unwrap(); + self.outbuf.put_slice(&hb).unwrap(); + self.outbuf.put_slice(&hf).unwrap(); + self.outbuf.put_slice(&hp).unwrap(); + Int::Empty + } + Ready(None) => Int::Done, + Pending => Int::Pending, + } + } else { + Int::NoWork + }; + let have_item = have_item | serialized.is_item(); + let write = if self.outbuf.len() > 0 { + let (b, w) = self.outbuf_conn(); + pin_mut!(w); + match w.poll_write(cx, b) { + Ready(k) => match k { + Ok(k) => match self.outbuf.adv(k) { + Ok(()) => { info!("sent {} bytes", k); + self.outbuf.rewind_if_needed(); + Int::Empty } - Err(e) => { - self.done = true; - break 'outer Ready(Some(Err(e.into()))); - } + Err(e) => Int::Item(Err::<(), _>(e)), }, - Pending => break true, - } - } else { - break false; + Err(e) => Int::Item(Err(e.into())), + }, + Pending => Int::Pending, } + } else { + Int::NoWork }; - let read_pending = loop { - if self.buf.len() < self.need_min { - let nf1 = self.buf.buf.len() - self.buf.rp; - let nf2 = self.need_min; - let (w, mut rbuf) = self.buf_conn(); - if nf1 < nf2 { - break 'outer Ready(Some(Err(Error::with_msg_no_trace("buffer too small for need_min")))); - } + let have_item = have_item | write.is_item(); + let read: Int> = if have_item || self.inp_eof { + Int::NoWork + } else { + if self.buf.cap() < self.conn_state.need_min() { + self.done = true; + let e = Error::with_msg_no_trace(format!( + "buffer too small for need_min {} {}", + self.buf.cap(), + self.conn_state.need_min() + )); + Int::Item(Err(e)) + } else if self.buf.len() < self.conn_state.need_min() { + let (w, mut rbuf) = self.inpbuf_conn(); pin_mut!(w); - let r = w.poll_read(cx, &mut rbuf); - match r { + match w.poll_read(cx, &mut rbuf) { Ready(k) => match k { - Ok(_) => { - info!("received {} bytes", rbuf.filled().len()); - if false { - let t = rbuf.filled().len(); - let t = if t < 32 { t } else { 32 }; - info!("got data {:?}", &rbuf.filled()[0..t]); + Ok(()) => { + let nf = rbuf.filled().len(); + if nf == 0 { + info!("EOF"); + self.inp_eof = true; + Int::Done + } else { + info!("received {} bytes", rbuf.filled().len()); + if false { + let t = rbuf.filled().len(); + let t = if t < 32 { t } else { 32 }; + info!("got data {:?}", &rbuf.filled()[0..t]); + } + match self.buf.wpadv(nf) { + Ok(()) => Int::Empty, + Err(e) => Int::Item(Err(e)), + } } - self.buf.wp += rbuf.filled().len(); - } - Err(e) => { - self.done = true; - break 'outer Ready(Some(Err(e.into()))); } + Err(e) => Int::Item(Err(e.into())), }, - Pending => break true, + Pending => Int::Pending, } } else { - break false; + Int::NoWork } }; - if self.buf.len() >= self.need_min { + let have_item = have_item | read.is_item(); + let parsed = if have_item || self.buf.len() < self.conn_state.need_min() { + Int::NoWork + } else { match self.parse_item() { Ok(k) => match k { - Some(k) => break 'outer Ready(Some(Ok(k))), - None => (), + Some(k) => Int::Item(Ok(k)), + None => Int::Empty, }, - Err(e) => { - self.done = true; - break 'outer Ready(Some(Err(e.into()))); - } + Err(e) => Int::Item(Err(e)), } - } - if write_pending || read_pending { - break 'outer Pending; - } + }; + let _have_item = have_item | parsed.is_item(); + { + use Int::*; + match (write, read, parsed) { + (Item(_), Item(_), _) => panic!(), + (Item(_), _, Item(_)) => panic!(), + (_, Item(_), Item(_)) => panic!(), + (NoWork | Done, NoWork | Done, NoWork | Done) => { + warn!("all NoWork or Done"); + break Poll::Pending; + } + (_, Item(Err(e)), _) => { + self.done = true; + break Poll::Ready(Some(Err(e))); + } + (_, _, Item(Err(e))) => { + self.done = true; + break Poll::Ready(Some(Err(e))); + } + (Item(_), _, _) => { + continue; + } + (_, Item(Ok(_)), _) => { + continue; + } + (_, _, Item(Ok(item))) => { + break Poll::Ready(Some(Ok(item))); + } + (Empty, _, _) => continue, + (_, Empty, _) => continue, + (_, _, Empty) => continue, + #[allow(unreachable_patterns)] + (Pending, Pending | NoWork | Done, Pending | NoWork | Done) => break Poll::Pending, + #[allow(unreachable_patterns)] + (Pending | NoWork | Done, Pending, Pending | NoWork | Done) => break Poll::Pending, + #[allow(unreachable_patterns)] + (Pending | NoWork | Done, Pending | NoWork | Done, Pending) => break Poll::Pending, + } + }; } } } diff --git a/taskrun/Cargo.toml b/taskrun/Cargo.toml index 2994e4c..f6ecc98 100644 --- a/taskrun/Cargo.toml +++ b/taskrun/Cargo.toml @@ -2,7 +2,10 @@ name = "taskrun" version = "0.0.1-a.0" authors = ["Dominik Werder "] -edition = "2018" +edition = "2021" + +[lib] +path = "src/taskrun.rs" [dependencies] tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } diff --git a/taskrun/src/lib.rs b/taskrun/src/taskrun.rs similarity index 100% rename from taskrun/src/lib.rs rename to taskrun/src/taskrun.rs