Care about larger payloads

This commit is contained in:
Dominik Werder
2024-03-06 12:28:47 +01:00
parent 9ddc0de9af
commit f52d941ca2
11 changed files with 256 additions and 316 deletions

View File

@@ -1,7 +1,6 @@
use super::proto;
use super::proto::CaEventValue;
use super::proto::ReadNotify;
use super::ExtraInsertsConf;
use crate::ca::proto::EventCancel;
use crate::conf::ChannelConfig;
use crate::senderpolling::SenderPolling;
@@ -42,7 +41,6 @@ use scywriiq::ConnectionStatusItem;
use scywriiq::IvlItem;
use scywriiq::MuteItem;
use scywriiq::QueryItem;
use serde::Deserialize;
use serde::Serialize;
use series::ChannelStatusSeriesId;
use series::SeriesId;
@@ -296,7 +294,7 @@ struct CreatedState {
cid: Cid,
sid: Sid,
ca_dbr_type: u16,
ca_dbr_count: u16,
ca_dbr_count: u32,
ts_created: Instant,
ts_alive_last: Instant,
ts_msp_last: u64,
@@ -694,7 +692,7 @@ impl Default for CaConnOpts {
fn default() -> Self {
Self {
insert_queue_max: 20000,
array_truncate: 2000,
array_truncate: 2000000,
}
}
}
@@ -1350,6 +1348,10 @@ impl CaConn {
// debug!("handle_event_add_res {ev:?}");
match ch_s {
ChannelState::Writable(st) => {
// debug!(
// "CaConn sees data_count {} payload_len {}",
// ev.data_count, ev.payload_len
// );
let stnow = self.tmp_ts_poll;
let crst = &mut st.channel;
let stwin_ts = stnow.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() / 4;
@@ -1571,6 +1573,7 @@ impl CaConn {
stnow: SystemTime,
stats: &CaConnStats,
) -> Result<(), Error> {
// debug!("event_add_ingest payload_len {} value {:?}", payload_len, value);
crst.ts_alive_last = tsnow;
crst.item_recv_ivl_ema.tick(tsnow);
crst.recv_count += 1;
@@ -1977,8 +1980,7 @@ impl CaConn {
cid,
sid,
ca_dbr_type,
// TODO for extended epics messages, can be u32!
ca_dbr_count: k.data_count as u16,
ca_dbr_count: k.data_count,
ts_created: tsnow,
ts_alive_last: tsnow,
ts_msp_last: 0,

View File

@@ -342,16 +342,16 @@ impl FindIocStream {
} else {
info!("cmdid {} payload {}", hi.cmdid(), hi.payload_len());
}
if nb.data().len() < hi.payload_len() {
if nb.data().len() < hi.payload_len() as usize {
error!("incomplete message, missing payload");
break;
}
let msg = CaMsg::from_proto_infos(&hi, nb.data(), tsnow, 32).map_err(|e| e.to_string())?;
nb.adv(hi.payload_len()).map_err(|e| e.to_string())?;
nb.adv(hi.payload_len() as usize).map_err(|e| e.to_string())?;
msgs.push(msg);
accounted += 16 + hi.payload_len();
}
if accounted != ec as usize {
if accounted != ec as u32 {
stats.ca_udp_unaccounted_data().inc();
debug!("unaccounted data ec {} accounted {}", ec, accounted);
}

View File

@@ -13,6 +13,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
use tokio::io::AsyncRead;
@@ -45,10 +46,13 @@ pub enum Error {
NoReadBufferSpace,
NeitherPendingNorProgress,
OutputBufferTooSmall,
LogicError,
}
const CA_PROTO_VERSION: u16 = 13;
const CA_PROTO_VERSION: u32 = 13;
const EPICS_EPOCH_OFFSET: u64 = 631152000;
const PAYLOAD_LEN_MAX: u32 = 1024 * 1024 * 32;
const PROTO_INPUT_BUF_CAP: u32 = 1024 * 1024 * 40;
#[derive(Debug)]
pub struct Search {
@@ -104,7 +108,7 @@ pub struct AccessRightsRes {
#[derive(Debug)]
pub struct EventAdd {
pub data_type: u16,
pub data_count: u16,
pub data_count: u32,
pub sid: u32,
pub subid: u32,
}
@@ -112,7 +116,7 @@ pub struct EventAdd {
#[derive(Debug)]
pub struct EventCancel {
pub data_type: u16,
pub data_count: u16,
pub data_count: u32,
pub sid: u32,
pub subid: u32,
}
@@ -145,7 +149,7 @@ pub struct EventAddResEmpty {
#[derive(Debug)]
pub struct ReadNotify {
pub data_type: u16,
pub data_count: u16,
pub data_count: u32,
pub sid: u32,
pub ioid: u32,
}
@@ -343,7 +347,11 @@ impl CaMsgTy {
}
fn len(&self) -> usize {
16 + self.payload_len()
if self.payload_len() <= 0x3ff0 && self.data_count() <= 0xffff {
16 + self.payload_len()
} else {
24 + self.payload_len()
}
}
fn payload_len(&self) -> usize {
@@ -407,7 +415,7 @@ impl CaMsgTy {
}
}
fn data_count(&self) -> u16 {
fn data_count(&self) -> u32 {
use CaMsgTy::*;
match self {
Version => CA_PROTO_VERSION,
@@ -607,21 +615,19 @@ impl CaMsg {
}
fn place_into(&self, buf: &mut [u8]) {
//info!("place_into given {} bytes buffer", buf.len());
if self.ty.payload_len() > 0x4000 - 16 {
error!("TODO emit for larger payloads");
panic!();
} else {
if self.ty.payload_len() <= 0x3ff0 && self.ty.data_count() <= 0xffff {
let pls = self.ty.payload_len() as u16;
let cnt = self.ty.data_count() as u16;
let t = self.ty.cmdid().to_be_bytes();
buf[0] = t[0];
buf[1] = t[1];
let t = (self.ty.payload_len() as u16).to_be_bytes();
let t = pls.to_be_bytes();
buf[2] = t[0];
buf[3] = t[1];
let t = self.ty.data_type().to_be_bytes();
buf[4] = t[0];
buf[5] = t[1];
let t = self.ty.data_count().to_be_bytes();
let t = cnt.to_be_bytes();
buf[6] = t[0];
buf[7] = t[1];
let t = self.ty.param1().to_be_bytes();
@@ -635,6 +641,40 @@ impl CaMsg {
buf[14] = t[2];
buf[15] = t[3];
self.ty.place_payload_into(&mut buf[16..]);
} else {
let pls = self.ty.payload_len();
let cnt = self.ty.data_count();
let t = self.ty.cmdid().to_be_bytes();
buf[0] = t[0];
buf[1] = t[1];
buf[2] = 0xff;
buf[3] = 0xff;
let t = self.ty.data_type().to_be_bytes();
buf[4] = t[0];
buf[5] = t[1];
buf[6] = 0x00;
buf[7] = 0x00;
let t = self.ty.param1().to_be_bytes();
buf[8] = t[0];
buf[9] = t[1];
buf[10] = t[2];
buf[11] = t[3];
let t = self.ty.param2().to_be_bytes();
buf[12] = t[0];
buf[13] = t[1];
buf[14] = t[2];
buf[15] = t[3];
let t = pls.to_be_bytes();
buf[16] = t[0];
buf[17] = t[1];
buf[18] = t[2];
buf[19] = t[3];
let t = cnt.to_be_bytes();
buf[20] = t[0];
buf[21] = t[1];
buf[22] = t[2];
buf[23] = t[3];
self.ty.place_payload_into(&mut buf[24..]);
}
}
@@ -686,7 +726,7 @@ impl CaMsg {
array_truncate: usize,
) -> Result<Self, Error> {
let msg = match hi.cmdid {
0x00 => CaMsg::from_ty_ts(CaMsgTy::VersionRes(hi.data_count), tsnow),
0x00 => CaMsg::from_ty_ts(CaMsgTy::VersionRes(hi.data_count() as u16), tsnow),
0x0b => {
let mut s = String::new();
s.extend(format!("{:?}", &payload[..payload.len().min(16)]).chars());
@@ -871,21 +911,20 @@ pub enum CaItem {
#[derive(Clone, Debug)]
pub struct HeadInfo {
cmdid: u16,
payload_size: u16,
payload_size: u32,
data_type: u16,
data_count: u16,
data_count: u32,
param1: u32,
param2: u32,
ext_payload_size: u32,
ext_data_count: u32,
is_ext: bool,
}
impl HeadInfo {
pub fn from_netbuf(buf: &mut SlideBuf) -> Result<Self, Error> {
let command = buf.read_u16_be()?;
let payload_size = buf.read_u16_be()?;
let payload_size = buf.read_u16_be()? as u32;
let data_type = buf.read_u16_be()?;
let data_count = buf.read_u16_be()?;
let data_count = buf.read_u16_be()? as u32;
let param1 = buf.read_u32_be()?;
let param2 = buf.read_u32_be()?;
let hi = HeadInfo {
@@ -895,15 +934,15 @@ impl HeadInfo {
data_count,
param1,
param2,
ext_payload_size: 0,
ext_data_count: 0,
is_ext: false,
};
Ok(hi)
}
fn with_ext(mut self, payload: u32, datacount: u32) -> Self {
self.ext_payload_size = payload;
self.ext_data_count = datacount;
self.is_ext = true;
self.payload_size = payload;
self.data_count = datacount;
self
}
@@ -911,20 +950,12 @@ impl HeadInfo {
self.cmdid
}
pub fn payload_len(&self) -> usize {
if self.payload_size == 0xffff {
self.ext_payload_size as _
} else {
self.payload_size as _
}
pub fn payload_len(&self) -> u32 {
self.payload_size
}
pub fn data_count(&self) -> usize {
if self.payload_size == 0xffff {
self.ext_data_count as _
} else {
self.data_count as _
}
pub fn data_count(&self) -> u32 {
self.data_count
}
// only for debug purpose
@@ -947,7 +978,7 @@ impl CaState {
match self {
StdHead => 16,
ExtHead(_) => 8,
Payload(k) => k.payload_len(),
Payload(k) => k.payload_len() as usize,
Done => 123,
}
}
@@ -955,6 +986,7 @@ impl CaState {
pub struct CaProto {
tcp: TcpStream,
tcp_eof: bool,
remote_addr_dbg: SocketAddrV4,
state: CaState,
buf: SlideBuf,
@@ -969,10 +1001,11 @@ impl CaProto {
pub fn new(tcp: TcpStream, remote_addr_dbg: SocketAddrV4, array_truncate: usize, stats: Arc<CaProtoStats>) -> Self {
Self {
tcp,
tcp_eof: false,
remote_addr_dbg,
state: CaState::StdHead,
buf: SlideBuf::new(1024 * 1024 * 8),
outbuf: SlideBuf::new(1024 * 128),
buf: SlideBuf::new(PROTO_INPUT_BUF_CAP as usize),
outbuf: SlideBuf::new(1024 * 256),
out: VecDeque::new(),
array_truncate,
stats,
@@ -988,16 +1021,6 @@ impl CaProto {
self.out.push_back(item);
}
fn inpbuf_conn(&mut self, need_min: usize) -> Result<(&mut TcpStream, ReadBuf), Error> {
let buf = self.buf.available_writable_area(need_min)?;
let buf = ReadBuf::new(buf);
Ok((&mut self.tcp, buf))
}
fn outbuf_conn(&mut self) -> (&mut TcpStream, &[u8]) {
(&mut self.tcp, self.outbuf.data())
}
fn out_msg_buf(&mut self) -> Option<(&CaMsg, &mut [u8])> {
if let Some(item) = self.out.front() {
match self.outbuf.available_writable_area(item.len()) {
@@ -1014,7 +1037,9 @@ impl CaProto {
fn attempt_output(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<usize, Error>> {
use Poll::*;
let (w, b) = self.outbuf_conn();
let this = self.as_mut().get_mut();
let w = &mut this.tcp;
let b = this.outbuf.data();
let w = Pin::new(w);
match w.poll_write(cx, b) {
Ready(k) => match k {
@@ -1046,66 +1071,52 @@ impl CaProto {
let g = self.outbuf.len();
self.stats.outbuf_len().ingest(g as u32);
}
'l1: while self.out.len() != 0 {
while let Some((msg, buf)) = self.out_msg_buf() {
let msglen = msg.len();
if msglen > buf.len() {
error!("got output buffer but too small");
let e = Error::OutputBufferTooSmall;
return Err(e);
} else {
msg.place_into(&mut buf[..msglen]);
self.outbuf.wadv(msglen)?;
self.out.pop_front();
self.stats.out_msg_placed().inc();
}
}
while self.outbuf.len() != 0 {
match Self::attempt_output(self.as_mut(), cx)? {
Ready(n) => {
if n != 0 {
have_progress = true;
} else {
// Should not occur to begin with. TODO restructure.
break 'l1;
}
}
Pending => {
have_pending = true;
break 'l1;
}
}
while let Some((msg, buf)) = self.out_msg_buf() {
let msglen = msg.len();
if msglen > buf.len() {
break;
}
msg.place_into(&mut buf[..msglen]);
self.outbuf.wadv(msglen)?;
self.out.pop_front();
self.stats.out_msg_placed().inc();
}
'l1: while self.outbuf.len() != 0 {
while self.outbuf.len() != 0 {
match Self::attempt_output(self.as_mut(), cx)? {
Ready(n) => {
if n != 0 {
have_progress = true;
} else {
// Should not occur to begin with. TODO restructure.
break 'l1;
if n == 0 {
let e = Error::LogicError;
return Err(e);
}
have_progress = true;
}
Pending => {
have_pending = true;
break 'l1;
break;
}
}
}
let need_min = self.state.need_min();
if self.buf.cap() < need_min {
self.state = CaState::Done;
let e = Error::BufferTooSmallForNeedMin(self.buf.cap(), self.state.need_min());
return Err(e);
{
let cap = self.buf.cap();
if cap < need_min {
let e = Error::BufferTooSmallForNeedMin(cap, need_min);
warn!("{e}");
return Err(e);
}
}
if self.buf.len() < need_min {
let (w, mut rbuf) = self.inpbuf_conn(need_min)?;
loop {
if self.tcp_eof {
break;
}
let this = self.as_mut().get_mut();
let tcp = Pin::new(&mut this.tcp);
let buf = this.buf.available_writable_area(need_min)?;
let mut rbuf = ReadBuf::new(buf);
if rbuf.remaining() == 0 {
return Err(Error::NoReadBufferSpace);
}
let w = Pin::new(w);
match w.poll_read(cx, &mut rbuf) {
break match tcp.poll_read(cx, &mut rbuf) {
Ready(k) => match k {
Ok(()) => {
let nf = rbuf.filled().len();
@@ -1116,23 +1127,22 @@ impl CaProto {
self.remote_addr_dbg,
self.state
);
// TODO may need another state, if not yet done when input is EOF.
self.state = CaState::Done;
have_progress = true;
self.tcp_eof = true;
} else {
if false {
info!("received {} bytes", rbuf.filled().len());
debug!("received {} bytes", rbuf.filled().len());
let t = rbuf.filled().len().min(32);
info!("received data {:?}", &rbuf.filled()[0..t]);
debug!("received data {:?}", &rbuf.filled()[0..t]);
}
match self.buf.wadv(nf) {
Ok(()) => {
have_progress = true;
self.stats.tcp_recv_bytes().add(nf as _);
self.stats.tcp_recv_count().inc();
continue;
}
Err(e) => {
error!("netbuf wadv fail nf {nf}");
error!("netbuf wadv fail nf {nf} {e}");
return Err(e.into());
}
}
@@ -1145,12 +1155,16 @@ impl CaProto {
Pending => {
have_pending = true;
}
}
};
}
while self.resqu.len() < self.resqu.capacity() {
if let Some(item) = self.parse_item(tsnow)? {
if self.buf.len() >= self.state.need_min() {
if let Some(item) = self.parse_item(tsnow)? {
self.resqu.push_back(item);
} else {
// Nothing to do
}
have_progress = true;
self.resqu.push_back(item);
} else {
break;
}
@@ -1160,29 +1174,19 @@ impl CaProto {
} else if have_pending {
Ok(Pending)
} else {
Err(Error::NeitherPendingNorProgress)
if self.tcp_eof {
self.state = CaState::Done;
Ok(Ready(()))
} else {
Err(Error::NeitherPendingNorProgress)
}
}
}
fn parse_item(&mut self, tsnow: Instant) -> Result<Option<CaItem>, Error> {
if self.buf.len() < self.state.need_min() {
return Ok(None);
}
match &self.state {
CaState::StdHead => {
let hi = HeadInfo::from_netbuf(&mut self.buf)?;
if hi.cmdid == 1 || hi.cmdid == 15 {
if hi.payload_size == 0xffff {
if hi.data_count != 0 {
warn!("protocol error: {hi:?}");
return Err(Error::ExtendedHeaderBadCount);
}
}
if hi.payload_size == 0xffff {
} else if hi.payload_size > 16368 {
self.stats.payload_std_too_large().inc();
}
}
if hi.cmdid > 26 {
// TODO count as logic error
self.stats.protocol_issue().inc();
@@ -1191,7 +1195,6 @@ impl CaProto {
self.state = CaState::ExtHead(hi);
Ok(None)
} else {
// For extended messages, ingest on receive of extended header
self.stats.payload_size().ingest(hi.payload_len() as u32);
if hi.payload_size == 0 {
self.state = CaState::StdHead;
@@ -1207,7 +1210,7 @@ impl CaProto {
let payload_size = self.buf.read_u32_be()?;
let data_count = self.buf.read_u32_be()?;
self.stats.payload_size().ingest(hi.payload_len() as u32);
if payload_size > 1024 * 1024 * 32 {
if payload_size > PAYLOAD_LEN_MAX {
self.stats.payload_ext_very_large().inc();
if false {
warn!(
@@ -1216,19 +1219,22 @@ impl CaProto {
);
}
}
if payload_size <= 16368 {
if payload_size <= 0x3ff0 {
// NOTE can happen even with zero payload, just because data-count exceeds u16.
self.stats.payload_ext_but_small().inc();
warn!(
"ExtHead data_type {} payload_size {payload_size} data_count {data_count}",
hi.data_type
);
if false {
warn!(
"ExtHead data_type {} payload_size {payload_size} data_count {data_count}",
hi.data_type
);
}
}
let hi = hi.clone().with_ext(payload_size, data_count);
self.state = CaState::Payload(hi);
Ok(None)
}
CaState::Payload(hi) => {
let g = self.buf.read_bytes(hi.payload_len())?;
let g = self.buf.read_bytes(hi.payload_len() as usize)?;
let msg = CaMsg::from_proto_infos(hi, g, tsnow, self.array_truncate)?;
// data-count is only reasonable for event messages
if let CaMsgTy::EventAddRes(..) = &msg.ty {
@@ -1257,7 +1263,10 @@ impl Stream for CaProto {
match k {
Ok(Ready(())) => continue,
Ok(Pending) => Pending,
Err(e) => Ready(Some(Err(e))),
Err(e) => {
self.state = CaState::Done;
Ready(Some(Err(e)))
}
}
};
}

View File

@@ -93,7 +93,7 @@ impl CaIngestOpts {
}
pub fn array_truncate(&self) -> u64 {
self.array_truncate.unwrap_or(1024 * 64)
self.array_truncate.unwrap_or(1024 * 200)
}
pub fn insert_item_queue_cap(&self) -> usize {