Register for multiple channels

This commit is contained in:
Dominik Werder
2022-04-29 16:16:20 +02:00
parent 5d3e8d6dc5
commit 4f607ce823
5 changed files with 1088 additions and 1025 deletions

View File

@@ -76,14 +76,14 @@ pub struct ChannelAccess {
#[clap(long)]
pub source: String,
#[clap(long)]
pub channel_name: String,
pub channel: Vec<String>,
}
impl From<ChannelAccess> for CaConnectOpts {
fn from(k: ChannelAccess) -> Self {
Self {
source: k.source,
channel_name: k.channel_name,
channels: k.channel,
}
}
}

File diff suppressed because it is too large Load Diff

322
netfetch/src/ca/conn.rs Normal file
View File

@@ -0,0 +1,322 @@
use super::proto::{CaItem, CaMsg, CaMsgTy, CaProto};
use crate::ca::proto::{CreateChan, EventAdd, ReadNotify};
use err::Error;
use futures_util::{Stream, StreamExt};
use log::*;
use std::collections::BTreeMap;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Instant;
use tokio::net::TcpStream;
#[derive(Debug)]
enum ChannelError {
NoSuccess,
}
#[derive(Debug)]
struct EventedState {
ts_last: Instant,
}
#[derive(Debug)]
enum MonitoringState {
AddingEvent,
Evented(EventedState),
Reading,
Read,
Muted,
}
#[derive(Debug)]
struct CreatedState {
cid: u32,
sid: u32,
ts_created: Instant,
state: MonitoringState,
}
#[derive(Debug)]
enum ChannelState {
NotCreated,
Creating { cid: u32, ts_beg: Instant },
Created(CreatedState),
Error(ChannelError),
}
enum CaConnState {
Init,
Listen,
PeerReady,
Done,
}
struct IdStore {
next: u32,
}
impl IdStore {
fn new() -> Self {
Self { next: 0 }
}
fn next(&mut self) -> u32 {
let ret = self.next;
self.next += 1;
ret
}
}
pub struct CaConn {
state: CaConnState,
proto: CaProto,
cid_store: IdStore,
ioid_store: IdStore,
subid_store: IdStore,
channels: BTreeMap<u32, ChannelState>,
cid_by_name: BTreeMap<String, u32>,
cid_by_subid: BTreeMap<u32, u32>,
name_by_cid: BTreeMap<u32, String>,
poll_count: usize,
}
impl CaConn {
pub fn new(tcp: TcpStream) -> Self {
Self {
state: CaConnState::Init,
proto: CaProto::new(tcp),
cid_store: IdStore::new(),
ioid_store: IdStore::new(),
subid_store: IdStore::new(),
channels: BTreeMap::new(),
cid_by_name: BTreeMap::new(),
cid_by_subid: BTreeMap::new(),
name_by_cid: BTreeMap::new(),
poll_count: 0,
}
}
pub fn channel_add(&mut self, channel: String) {
let cid = self.cid_by_name(&channel);
if self.channels.contains_key(&cid) {
} else {
self.channels.insert(cid, ChannelState::NotCreated);
}
}
fn cid_by_name(&mut self, name: &str) -> u32 {
if let Some(cid) = self.cid_by_name.get(name) {
*cid
} else {
let cid = self.cid_store.next();
self.cid_by_name.insert(name.into(), cid);
self.name_by_cid.insert(cid, name.into());
cid
}
}
fn name_by_cid(&self, cid: u32) -> Option<&str> {
self.name_by_cid.get(&cid).map(|x| x.as_str())
}
}
impl Stream for CaConn {
type Item = Result<(), Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
self.poll_count += 1;
if self.poll_count > 30 {
error!("TODO CaConn reached poll_count limit");
return Ready(None);
}
loop {
break match &self.state {
CaConnState::Init => {
let msg = CaMsg { ty: CaMsgTy::Version };
self.proto.push_out(msg);
let msg = CaMsg {
ty: CaMsgTy::ClientName,
};
self.proto.push_out(msg);
let msg = CaMsg { ty: CaMsgTy::HostName };
self.proto.push_out(msg);
self.state = CaConnState::Listen;
continue;
}
CaConnState::Listen => match self.proto.poll_next_unpin(cx) {
Ready(Some(k)) => match k {
Ok(k) => match k {
CaItem::Empty => {
info!("CaItem::Empty");
Ready(Some(Ok(())))
}
CaItem::Msg(msg) => match msg.ty {
CaMsgTy::VersionRes(n) => {
if n < 12 || n > 13 {
error!("See some unexpected version {n} channel search may not work.");
Ready(Some(Ok(())))
} else {
info!("Received peer version {n}");
self.state = CaConnState::PeerReady;
continue;
}
}
k => {
warn!("Got some other unhandled message: {k:?}");
Ready(Some(Ok(())))
}
},
},
Err(e) => {
error!("got error item from CaProto {e:?}");
Ready(Some(Ok(())))
}
},
Ready(None) => {
warn!("CaProto is done");
self.state = CaConnState::Done;
continue;
}
Pending => Pending,
},
CaConnState::PeerReady => {
// TODO unify with Listen state where protocol gets polled as well.
let mut msgs_tmp = vec![];
// TODO profile, efficient enough?
let keys: Vec<u32> = self.channels.keys().map(|x| *x).collect();
for cid in keys {
match self.channels[&cid] {
ChannelState::NotCreated => {
let name = self
.name_by_cid(cid)
.ok_or_else(|| Error::with_msg_no_trace("name for cid not known"));
let name = match name {
Ok(k) => k,
Err(e) => return Ready(Some(Err(e))),
};
info!("Sending CreateChan for {}", name);
let msg = CaMsg {
ty: CaMsgTy::CreateChan(CreateChan {
cid,
channel: name.into(),
}),
};
msgs_tmp.push(msg);
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
*ch_s = ChannelState::Creating {
cid,
ts_beg: Instant::now(),
};
}
_ => {}
}
}
let mut do_wake_again = false;
if msgs_tmp.len() > 0 {
info!("msgs_tmp.len() {}", msgs_tmp.len());
do_wake_again = true;
}
// TODO be careful to not overload outgoing message queue.
for msg in msgs_tmp {
self.proto.push_out(msg);
}
let res = match self.proto.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
match k {
CaItem::Msg(k) => match k.ty {
CaMsgTy::SearchRes(k) => {
let a = k.addr.to_be_bytes();
let addr = format!("{}.{}.{}.{}:{}", a[0], a[1], a[2], a[3], k.tcp_port);
info!("Search result indicates server address: {addr}");
}
CaMsgTy::CreateChanRes(k) => {
// TODO handle cid-not-found which can also indicate peer error.
let cid = k.cid;
let name = self.name_by_cid(cid);
info!("Channel created for {name:?} now register for events");
let subid = self.subid_store.next();
self.cid_by_subid.insert(subid, cid);
let msg = CaMsg {
ty: CaMsgTy::EventAdd(EventAdd {
sid: k.sid,
data_type: k.data_type,
data_count: k.data_count,
subid,
}),
};
self.proto.push_out(msg);
do_wake_again = true;
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&k.cid).unwrap();
*ch_s = ChannelState::Created(CreatedState {
cid: k.cid,
sid: k.sid,
ts_created: Instant::now(),
state: MonitoringState::AddingEvent,
});
info!(
"Channel is created cid {} sid {} name {}",
k.cid, k.sid, self.name_by_cid[&k.cid]
);
}
CaMsgTy::EventAddRes(k) => {
// TODO handle subid-not-found which can also be peer error:
let cid = *self.cid_by_subid.get(&k.subid).unwrap();
// TODO get rid of the string clone when I don't want the log output any longer:
let name: String = self.name_by_cid(cid).unwrap().into();
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
match ch_s {
ChannelState::Created(st) => {
match st.state {
MonitoringState::AddingEvent => {
info!("Confirmation {name} is subscribed.");
// TODO get ts from faster common source:
st.state = MonitoringState::Evented(EventedState {
ts_last: Instant::now(),
});
}
MonitoringState::Evented(ref mut st) => {
// TODO get ts from faster common source:
st.ts_last = Instant::now();
}
_ => {
warn!("bad state? not always, could be late message.");
}
}
}
_ => {
warn!("unexpected state: EventAddRes while having {ch_s:?}");
}
}
}
_ => {}
},
_ => {}
}
Ready(Some(Ok(())))
}
Ready(Some(Err(e))) => {
error!("CaProto yields error: {e:?}");
Ready(Some(Err(e)))
}
Ready(None) => {
warn!("CaProto is done");
self.state = CaConnState::Done;
Ready(Some(Ok(())))
}
Pending => Pending,
};
if do_wake_again {
info!("do_wake_again");
cx.waker().wake_by_ref();
}
res
}
CaConnState::Done => Ready(None),
};
}
}
}

View File

745
netfetch/src/ca/proto.rs Normal file
View File

@@ -0,0 +1,745 @@
use crate::netbuf::NetBuf;
use err::Error;
use futures_util::{pin_mut, Stream};
use log::*;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpStream;
const CA_PROTO_VERSION: u16 = 13;
#[derive(Debug)]
pub struct Search {
pub id: u32,
pub channel: String,
}
#[derive(Debug)]
pub struct SearchRes {
pub addr: u32,
pub tcp_port: u16,
pub sid: u32,
}
#[derive(Debug)]
pub struct ClientNameRes {
pub name: String,
}
#[derive(Debug)]
pub struct CreateChan {
pub cid: u32,
pub channel: String,
}
#[derive(Debug)]
pub struct CreateChanRes {
pub data_type: u16,
pub data_count: u16,
pub cid: u32,
pub sid: u32,
}
#[derive(Debug)]
pub struct AccessRightsRes {
pub cid: u32,
pub rights: u32,
}
#[derive(Debug)]
pub struct EventAdd {
pub data_type: u16,
pub data_count: u16,
pub sid: u32,
pub subid: u32,
}
#[derive(Debug)]
pub struct EventAddRes {
pub data_type: u16,
pub data_count: u16,
pub status: u32,
pub subid: u32,
}
#[derive(Debug)]
pub struct ReadNotify {
pub data_type: u16,
pub data_count: u16,
pub sid: u32,
pub ioid: u32,
}
#[derive(Debug)]
pub struct ReadNotifyRes {
pub data_type: u16,
pub data_count: u16,
pub sid: u32,
pub ioid: u32,
}
#[derive(Debug)]
enum CaScalarType {
I8,
I16,
I32,
F32,
F64,
}
impl CaScalarType {
fn from_ca_u16(k: u16) -> Result<Self, Error> {
use CaScalarType::*;
let ret = match k {
4 => I8,
1 => I16,
5 => I32,
2 => F32,
6 => F64,
k => return Err(Error::with_msg_no_trace(format!("bad dbr type id: {k}"))),
};
Ok(ret)
}
}
#[derive(Debug)]
pub enum CaMsgTy {
Version,
VersionRes(u16),
ClientName,
ClientNameRes(ClientNameRes),
HostName,
Search(Search),
SearchRes(SearchRes),
CreateChan(CreateChan),
CreateChanRes(CreateChanRes),
AccessRightsRes(AccessRightsRes),
EventAdd(EventAdd),
EventAddRes(EventAddRes),
ReadNotify(ReadNotify),
ReadNotifyRes(ReadNotifyRes),
}
impl CaMsgTy {
fn cmdid(&self) -> u16 {
use CaMsgTy::*;
match self {
Version => 0,
VersionRes(_) => 0,
ClientName => 20,
ClientNameRes(_) => 20,
HostName => 21,
Search(_) => 6,
SearchRes(_) => 6,
CreateChan(_) => 18,
CreateChanRes(_) => 18,
AccessRightsRes(_) => 22,
EventAdd(_) => 1,
EventAddRes(_) => 1,
ReadNotify(_) => 15,
ReadNotifyRes(_) => 15,
}
}
fn len(&self) -> usize {
16 + self.payload_len()
}
fn payload_len(&self) -> usize {
use CaMsgTy::*;
match self {
Version => 0,
VersionRes(_) => 0,
ClientName => 8,
ClientNameRes(x) => (7 + x.name.len()) / 8 * 8,
HostName => 8,
Search(s) => (7 + s.channel.len()) / 8 * 8,
SearchRes(_) => 8,
CreateChan(x) => (7 + x.channel.len()) / 8 * 8,
CreateChanRes(_) => 0,
AccessRightsRes(_) => 0,
EventAdd(_) => 16,
EventAddRes(_) => {
error!("should not attempt to serialize the response again");
panic!();
}
ReadNotify(_) => 0,
ReadNotifyRes(_) => {
error!("should not attempt to serialize the response again");
panic!();
}
}
}
fn data_type(&self) -> u16 {
use CaMsgTy::*;
match self {
Version => CA_PROTO_VERSION,
VersionRes(n) => *n,
ClientName => 0,
ClientNameRes(_) => 0,
HostName => 0,
Search(_) => {
// Reply-flag
1
}
SearchRes(x) => x.tcp_port,
CreateChan(_) => 0,
CreateChanRes(x) => x.data_type,
AccessRightsRes(_) => 0,
EventAdd(x) => x.data_type,
EventAddRes(x) => x.data_type,
ReadNotify(x) => x.data_type,
ReadNotifyRes(x) => x.data_type,
}
}
fn data_count(&self) -> u16 {
use CaMsgTy::*;
match self {
Version => 0,
VersionRes(_) => 0,
ClientName => 0,
ClientNameRes(_) => 0,
HostName => 0,
Search(_) => CA_PROTO_VERSION,
SearchRes(_) => 0,
CreateChan(_) => 0,
CreateChanRes(x) => x.data_count,
AccessRightsRes(_) => 0,
EventAdd(x) => x.data_count,
EventAddRes(x) => x.data_count,
ReadNotify(x) => x.data_count,
ReadNotifyRes(x) => x.data_count,
}
}
fn param1(&self) -> u32 {
use CaMsgTy::*;
match self {
Version => 0,
VersionRes(_) => 0,
ClientName => 0,
ClientNameRes(_) => 0,
HostName => 0,
Search(e) => e.id,
SearchRes(x) => x.addr,
CreateChan(x) => x.cid,
CreateChanRes(x) => x.cid,
AccessRightsRes(x) => x.cid,
EventAdd(x) => x.sid,
EventAddRes(x) => x.status,
ReadNotify(x) => x.sid,
ReadNotifyRes(x) => x.sid,
}
}
fn param2(&self) -> u32 {
use CaMsgTy::*;
match self {
Version => 0,
VersionRes(_) => 0,
ClientName => 0,
ClientNameRes(_) => 0,
HostName => 0,
Search(e) => e.id,
SearchRes(x) => x.sid,
CreateChan(_) => CA_PROTO_VERSION as _,
CreateChanRes(x) => x.sid,
AccessRightsRes(x) => x.rights,
EventAdd(x) => x.subid,
EventAddRes(x) => x.subid,
ReadNotify(x) => x.ioid,
ReadNotifyRes(x) => x.ioid,
}
}
fn place_payload_into(&self, buf: &mut [u8]) {
use CaMsgTy::*;
match self {
Version => {}
VersionRes(_) => {}
ClientName => {
// TODO allow variable client name. Null-extend always to 8 byte align.
buf.copy_from_slice(b"SA10\0\0\0\0");
}
ClientNameRes(_) => {
error!("should not attempt to write ClientNameRes");
panic!();
}
HostName => {
// TODO allow variable host name. Null-extend always to 8 byte align.
buf.copy_from_slice(b"SA10\0\0\0\0");
}
Search(e) => {
for x in &mut buf[..] {
*x = 0;
}
let d = e.channel.as_bytes();
if buf.len() < d.len() + 1 {
error!("bad buffer given");
panic!();
}
unsafe { std::ptr::copy(&d[0] as _, &mut buf[0] as _, d.len()) };
}
SearchRes(_) => {
error!("should not attempt to write SearchRes");
panic!();
}
CreateChan(x) => {
for x in &mut buf[..] {
*x = 0;
}
let d = x.channel.as_bytes();
if buf.len() < d.len() + 1 {
error!("bad buffer given");
panic!();
}
unsafe { std::ptr::copy(&d[0] as _, &mut buf[0] as _, d.len()) };
}
CreateChanRes(_) => {}
AccessRightsRes(_) => {}
EventAdd(_) => {
// TODO allow to customize the mask. Test if it works.
buf.copy_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0, 0]);
}
EventAddRes(_) => {}
ReadNotify(_) => {}
ReadNotifyRes(_) => {}
}
}
}
#[derive(Debug)]
pub struct CaMsg {
pub ty: CaMsgTy,
}
impl CaMsg {
fn len(&self) -> usize {
self.ty.len()
}
fn place_into(&self, buf: &mut [u8]) {
info!("place_into given {} bytes buffer", buf.len());
if self.ty.payload_len() > 0x4000 - 16 {
error!("TODO emit for larger payloads");
panic!();
} else {
let t = self.ty.cmdid().to_be_bytes();
buf[0] = t[0];
buf[1] = t[1];
let t = (self.ty.payload_len() as u16).to_be_bytes();
buf[2] = t[0];
buf[3] = t[1];
let t = self.ty.data_type().to_be_bytes();
buf[4] = t[0];
buf[5] = t[1];
let t = self.ty.data_count().to_be_bytes();
buf[6] = t[0];
buf[7] = t[1];
let t = self.ty.param1().to_be_bytes();
buf[8] = t[0];
buf[9] = t[1];
buf[10] = t[2];
buf[11] = t[3];
let t = self.ty.param2().to_be_bytes();
buf[12] = t[0];
buf[13] = t[1];
buf[14] = t[2];
buf[15] = t[3];
self.ty.place_payload_into(&mut buf[16..]);
}
}
fn from_proto_infos(hi: &HeadInfo, payload: &[u8]) -> Result<Self, Error> {
let msg = match hi.cmdid {
0 => CaMsg {
ty: CaMsgTy::VersionRes(hi.data_count),
},
20 => {
let name = std::ffi::CString::new(payload)
.map(|s| s.into_string().unwrap_or_else(|e| format!("{e:?}")))
.unwrap_or_else(|e| format!("{e:?}"));
CaMsg {
ty: CaMsgTy::ClientNameRes(ClientNameRes { name }),
}
}
// TODO make response type for host name:
21 => CaMsg { ty: CaMsgTy::HostName },
6 => {
if hi.payload_size != 8 {
warn!("protocol error: search result is expected with fixed payload size 8");
}
if hi.data_count != 0 {
warn!("protocol error: search result is expected with data count 0");
}
CaMsg {
ty: CaMsgTy::SearchRes(SearchRes {
tcp_port: hi.data_type,
addr: hi.param1,
sid: hi.param2,
}),
}
}
18 => {
CaMsg {
// TODO use different structs for request and response:
ty: CaMsgTy::CreateChanRes(CreateChanRes {
data_type: hi.data_type,
data_count: hi.data_count,
cid: hi.param1,
sid: hi.param2,
}),
}
}
22 => {
CaMsg {
// TODO use different structs for request and response:
ty: CaMsgTy::AccessRightsRes(AccessRightsRes {
cid: hi.param1,
rights: hi.param2,
}),
}
}
1 => {
let ca_st = CaScalarType::from_ca_u16(hi.data_type)?;
match ca_st {
CaScalarType::F64 => {
// TODO handle wrong payload sizer in more distinct way.
let v = f64::from_be_bytes(payload.try_into()?);
info!("Payload as f64: {v}");
}
_ => {
warn!("TODO handle {ca_st:?}");
}
}
let d = EventAddRes {
data_type: hi.data_type,
data_count: hi.data_count,
status: hi.param1,
subid: hi.param2,
};
CaMsg {
ty: CaMsgTy::EventAddRes(d),
}
}
15 => {
if payload.len() == 8 {
let v = u64::from_be_bytes(payload.try_into()?);
info!("Payload as u64: {v}");
let v = i64::from_be_bytes(payload.try_into()?);
info!("Payload as i64: {v}");
let v = f64::from_be_bytes(payload.try_into()?);
info!("Payload as f64: {v}");
} else {
info!(
"payload string {:?} payload {:?}",
String::from_utf8_lossy(&payload[..payload.len().min(12)]),
&payload[..payload.len().min(12)],
);
}
CaMsg {
// TODO use different structs for request and response:
ty: CaMsgTy::ReadNotifyRes(ReadNotifyRes {
data_type: hi.data_type,
data_count: hi.data_count,
sid: hi.param1,
ioid: hi.param2,
}),
}
}
x => return Err(Error::with_msg_no_trace(format!("unsupported ca command {}", x))),
};
Ok(msg)
}
}
#[derive(Debug)]
pub enum CaItem {
Empty,
Msg(CaMsg),
}
impl CaItem {
fn empty() -> Self {
CaItem::Empty
}
}
#[derive(Clone, Debug)]
struct HeadInfo {
cmdid: u16,
payload_size: u16,
data_type: u16,
data_count: u16,
param1: u32,
param2: u32,
}
enum CaState {
StdHead,
ExtHead(HeadInfo),
Payload(HeadInfo),
Done,
}
impl CaState {
fn need_min(&self) -> usize {
use CaState::*;
match self {
StdHead => 16,
ExtHead(_) => 8,
Payload(k) => k.payload_size as _,
Done => 123,
}
}
}
pub struct CaProto {
tcp: TcpStream,
state: CaState,
buf: NetBuf,
outbuf: NetBuf,
out: VecDeque<CaMsg>,
}
impl CaProto {
pub fn new(tcp: TcpStream) -> Self {
Self {
tcp,
state: CaState::StdHead,
buf: NetBuf::new(1024 * 128),
outbuf: NetBuf::new(1024 * 128),
out: VecDeque::new(),
}
}
pub fn push_out(&mut self, item: CaMsg) {
self.out.push_back(item);
}
fn inpbuf_conn(&mut self, need_min: usize) -> (&mut TcpStream, ReadBuf) {
(&mut self.tcp, self.buf.read_buf_for_fill(need_min))
}
fn outbuf_conn(&mut self) -> (&mut TcpStream, &[u8]) {
(&mut self.tcp, self.outbuf.data())
}
fn out_msg_buf(&mut self) -> Option<(&CaMsg, &mut [u8])> {
if let Some(item) = self.out.front() {
info!("attempt to serialize outgoing message msg {:?}", item);
if let Ok(buf) = self.outbuf.write_buf(item.len()) {
Some((item, buf))
} else {
error!("output buffer too small for message");
None
}
} else {
None
}
}
fn attempt_output(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
use Poll::*;
let (w, b) = self.outbuf_conn();
pin_mut!(w);
match w.poll_write(cx, b) {
Ready(k) => match k {
Ok(k) => {
info!("sent {} bytes {:?}", k, &self.outbuf.data()[..k]);
match self.outbuf.adv(k) {
Ok(()) => Ready(Ok(())),
Err(e) => {
error!("advance error {:?}", e);
Ready(Err(e))
}
}
}
Err(e) => {
error!("output write error {:?}", e);
Ready(Err(e.into()))
}
},
Pending => Pending,
}
}
fn loop_body(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<Option<Poll<CaItem>>, Error> {
use Poll::*;
if self.out.len() != 0 || self.outbuf.len() != 0 {
info!("loop_body out {} outbuf {}", self.out.len(), self.outbuf.len());
}
let output_res_1: Option<Poll<()>> = 'll1: loop {
if self.out.len() == 0 {
break None;
}
while let Some((msg, buf)) = self.out_msg_buf() {
msg.place_into(buf);
self.out.pop_front();
}
while self.outbuf.len() > 0 {
match Self::attempt_output(self.as_mut(), cx)? {
Ready(()) => {}
Pending => {
break 'll1 Some(Pending);
}
}
}
};
let output_res_2: Option<Poll<()>> = if let Some(Pending) = output_res_1 {
Some(Pending)
} else {
loop {
if self.outbuf.len() == 0 {
break None;
}
match Self::attempt_output(self.as_mut(), cx)? {
Ready(()) => {}
Pending => break Some(Pending),
}
}
};
let need_min = self.state.need_min();
let read_res = {
if self.buf.cap() < need_min {
self.state = CaState::Done;
let e = Error::with_msg_no_trace(format!(
"buffer too small for need_min {} {}",
self.buf.cap(),
self.state.need_min()
));
Err(e)
} else if self.buf.len() < need_min {
let (w, mut rbuf) = self.inpbuf_conn(need_min);
pin_mut!(w);
match w.poll_read(cx, &mut rbuf) {
Ready(k) => match k {
Ok(()) => {
let nf = rbuf.filled().len();
if nf == 0 {
info!("EOF");
// TODO may need another state, if not yet done when input is EOF.
self.state = CaState::Done;
Ok(Some(Ready(CaItem::empty())))
} else {
if false {
info!("received {} bytes", rbuf.filled().len());
let t = rbuf.filled().len().min(32);
info!("received data {:?}", &rbuf.filled()[0..t]);
}
match self.buf.wadv(nf) {
Ok(()) => Ok(Some(Ready(CaItem::empty()))),
Err(e) => {
error!("netbuf wadv fail nf {nf}");
Err(e)
}
}
}
}
Err(e) => Err(e.into()),
},
Pending => Ok(Some(Pending)),
}
} else {
Ok(None)
}
}?;
let parse_res: Option<CaItem> = self.parse_item()?;
match (output_res_2, read_res, parse_res) {
(_, _, Some(item)) => Ok(Some(Ready(item))),
(Some(Pending), _, _) => Ok(Some(Pending)),
(_, Some(Pending), _) => Ok(Some(Pending)),
(_, None, None) => {
// TODO constrain how often we can go to this case consecutively.
Ok(None)
}
(_, Some(_), None) => Ok(None),
}
}
fn parse_item(&mut self) -> Result<Option<CaItem>, Error> {
loop {
if self.buf.len() < self.state.need_min() {
break Ok(None);
}
break match &self.state {
CaState::StdHead => {
let command = self.buf.read_u16_be()?;
let payload_size = self.buf.read_u16_be()?;
let data_type = self.buf.read_u16_be()?;
let data_count = self.buf.read_u16_be()?;
let param1 = self.buf.read_u32_be()?;
let param2 = self.buf.read_u32_be()?;
let hi = HeadInfo {
cmdid: command,
payload_size,
data_type,
data_count,
param1,
param2,
};
if hi.cmdid == 6 || hi.cmdid > 26 || hi.data_type > 10 || hi.payload_size > 8 {
warn!("StdHead {hi:?}");
}
if payload_size == 0xffff && data_count == 0 {
self.state = CaState::ExtHead(hi);
Ok(None)
} else {
if payload_size == 0 {
self.state = CaState::StdHead;
let msg = CaMsg::from_proto_infos(&hi, &[])?;
Ok(Some(CaItem::Msg(msg)))
} else {
self.state = CaState::Payload(hi);
Ok(None)
}
}
}
CaState::ExtHead(hi) => {
let payload_size = self.buf.read_u32_be()?;
let data_count = self.buf.read_u32_be()?;
warn!("ExtHead payload_size {payload_size} data_count {data_count}");
if payload_size == 0 {
let msg = CaMsg::from_proto_infos(hi, &[])?;
self.state = CaState::StdHead;
Ok(Some(CaItem::Msg(msg)))
} else {
self.state = CaState::Payload(hi.clone());
Ok(None)
}
}
CaState::Payload(hi) => {
let g = self.buf.read_bytes(hi.payload_size as _)?;
let msg = CaMsg::from_proto_infos(hi, g)?;
self.state = CaState::StdHead;
Ok(Some(CaItem::Msg(msg)))
}
CaState::Done => Err(Error::with_msg_no_trace("attempt to parse in Done state")),
};
}
}
}
impl Stream for CaProto {
type Item = Result<CaItem, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if let CaState::Done = self.state {
return Ready(None);
} else {
loop {
break match Self::loop_body(self.as_mut(), cx) {
Ok(Some(Ready(k))) => Ready(Some(Ok(k))),
Ok(Some(Pending)) => Pending,
Ok(None) => continue,
Err(e) => Ready(Some(Err(e))),
};
}
}
}
}