diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 3a243ad..56484a2 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -35,6 +35,7 @@ stats = { path = "../stats" } scywr = { path = "../scywr" } dbpg = { path = "../dbpg" } ingest-linux = { path = "../ingest-linux" } +slidebuf = { path = "../slidebuf" } err = { path = "../../daqbuffer/crates/err" } netpod = { path = "../../daqbuffer/crates/netpod" } items_0 = { path = "../../daqbuffer/crates/items_0" } diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index ddfeaa3..d8468ee 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -279,7 +279,7 @@ impl FindIocStream { error!("received packet too large"); panic!(); } - let mut nb = crate::netbuf::NetBuf::new(2048); + let mut nb = slidebuf::SlideBuf::new(2048); nb.put_slice(&buf[..ec as usize]).map_err(|e| e.to_string())?; let mut msgs = Vec::new(); let mut accounted = 0; diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 802d58d..233ddad 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -1,11 +1,11 @@ use crate::netbuf; -use crate::netbuf::NetBuf; use err::thiserror; use err::ThisError; use futures_util::pin_mut; use futures_util::Stream; use log::*; use netpod::timeunits::*; +use slidebuf::SlideBuf; use std::collections::BTreeMap; use std::collections::VecDeque; use std::io; @@ -23,37 +23,23 @@ use tokio::net::TcpStream; #[derive(Debug, ThisError)] pub enum Error { - #[error("{0}")] NetBuf(#[from] netbuf::Error), + SlideBuf(#[from] slidebuf::Error), #[error("BufferTooSmallForNeedMin({0}, {1})")] BufferTooSmallForNeedMin(usize, usize), - #[error("IO({0})")] IO(#[from] io::Error), - #[error("BadSlice")] BadSlice, - #[error("BadCaDbrTypeId({0})")] BadCaDbrTypeId(u16), - #[error("BadCaScalarTypeId({0})")] BadCaScalarTypeId(u16), - #[error("GetValHelpInnerTypeMismatch")] GetValHelpInnerTypeMismatch, - #[error("GetValHelpTodoWaveform")] GetValHelpTodoWaveform, - #[error("NotEnoughPayload")] NotEnoughPayload, - #[error("TodoConversionArray")] TodoConversionArray, - #[error("CaProtoVersionMissing")] CaProtoVersionMissing, - #[error("NotEnoughPayloadTimeMetadata({0})")] NotEnoughPayloadTimeMetadata(usize), - #[error("MismatchDbrTimeType")] MismatchDbrTimeType, - #[error("BadCaCount")] BadCaCount, - #[error("CaCommandNotSupported({0})")] CaCommandNotSupported(u16), - #[error("ParseAttemptInDoneState")] ParseAttemptInDoneState, } @@ -906,7 +892,7 @@ pub struct HeadInfo { } impl HeadInfo { - pub fn from_netbuf(buf: &mut NetBuf) -> Result { + pub fn from_netbuf(buf: &mut SlideBuf) -> Result { let command = buf.read_u16_be()?; let payload_size = buf.read_u16_be()?; let data_type = buf.read_u16_be()?; @@ -957,8 +943,8 @@ pub struct CaProto { tcp: TcpStream, remote_addr_dbg: SocketAddrV4, state: CaState, - buf: NetBuf, - outbuf: NetBuf, + buf: SlideBuf, + outbuf: SlideBuf, out: VecDeque, array_truncate: usize, logged_proto_error_for_cid: BTreeMap, @@ -970,8 +956,8 @@ impl CaProto { tcp, remote_addr_dbg, state: CaState::StdHead, - buf: NetBuf::new(1024 * 128), - outbuf: NetBuf::new(1024 * 128), + buf: SlideBuf::new(1024 * 128), + outbuf: SlideBuf::new(1024 * 128), out: VecDeque::new(), array_truncate, logged_proto_error_for_cid: BTreeMap::new(), @@ -982,8 +968,10 @@ impl CaProto { self.out.push_back(item); } - fn inpbuf_conn(&mut self, need_min: usize) -> (&mut TcpStream, ReadBuf) { - (&mut self.tcp, self.buf.read_buf_for_fill(need_min)) + fn inpbuf_conn(&mut self, need_min: usize) -> Result<(&mut TcpStream, ReadBuf), Error> { + let buf = self.buf.available_writable_area(need_min)?; + let buf = ReadBuf::new(buf); + Ok((&mut self.tcp, buf)) } fn outbuf_conn(&mut self) -> (&mut TcpStream, &[u8]) { @@ -992,10 +980,12 @@ impl CaProto { fn out_msg_buf(&mut self) -> Option<(&CaMsg, &mut [u8])> { if let Some(item) = self.out.front() { - if let Ok(buf) = self.outbuf.write_buf(item.len()) { - Some((item, buf)) - } else { - None + match self.outbuf.available_writable_area(item.len()) { + Ok(buf) => Some((item, buf)), + Err(_) => { + // TODO is this the correct behavior? + None + } } } else { None @@ -1031,11 +1021,13 @@ impl CaProto { break None; } while let Some((msg, buf)) = self.out_msg_buf() { - if msg.len() > buf.len() { + let msglen = msg.len(); + if msglen > buf.len() { error!("got output buffer but too small"); break; } else { msg.place_into(buf); + self.outbuf.wadv(msglen)?; self.out.pop_front(); } } @@ -1068,7 +1060,7 @@ impl CaProto { let e = Error::BufferTooSmallForNeedMin(self.buf.cap(), self.state.need_min()); Err(e) } else if self.buf.len() < need_min { - let (w, mut rbuf) = self.inpbuf_conn(need_min); + let (w, mut rbuf) = self.inpbuf_conn(need_min)?; pin_mut!(w); match w.poll_read(cx, &mut rbuf) { Ready(k) => match k {