diff --git a/daqingest/src/daqingest.rs b/daqingest/src/daqingest.rs index 94bc348..78e4e2c 100644 --- a/daqingest/src/daqingest.rs +++ b/daqingest/src/daqingest.rs @@ -76,14 +76,14 @@ pub struct ChannelAccess { #[clap(long)] pub source: String, #[clap(long)] - pub channel_name: String, + pub channel: Vec, } impl From for CaConnectOpts { fn from(k: ChannelAccess) -> Self { Self { source: k.source, - channel_name: k.channel_name, + channels: k.channel, } } } diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index c3eccb9..6ad55ee 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -1,1038 +1,34 @@ -use crate::netbuf::NetBuf; -use async_channel::{bounded, Receiver}; -use bytes::{BufMut, BytesMut}; -use err::{ErrStr, Error}; -use futures_util::{pin_mut, FutureExt, Stream, StreamExt}; +pub mod conn; +pub mod proto; + +use self::conn::CaConn; +use err::Error; +use futures_util::StreamExt; use log::*; -use serde::{Deserialize, Serialize}; -use std::collections::VecDeque; -use std::pin::Pin; -use std::task::{Context, Poll}; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; use tokio::net::TcpStream; -#[derive(Debug, Serialize, Deserialize)] -pub struct Message { - cmd: u16, - payload_len: u16, - type_type: u16, - data_len: u16, -} - -#[derive(Debug, Serialize, Deserialize)] -pub enum FetchItem { - Log(String), - Message(Message), -} - -pub async fn ca_connect_1() -> Result>, Error> { - let (tx, rx) = bounded(16); - let tx2 = tx.clone(); - tokio::task::spawn( - async move { - let mut conn = tokio::net::TcpStream::connect("S30CB06-CVME-LLRF2.psi.ch:5064").await?; - let (mut inp, mut out) = conn.split(); - tx.send(Ok(FetchItem::Log(format!("connected")))).await.errstr()?; - let mut buf = [0; 64]; - - let mut b2 = BytesMut::with_capacity(128); - b2.put_u16(0x00); - b2.put_u16(0); - b2.put_u16(0); - b2.put_u16(0xb); - b2.put_u32(0); - b2.put_u32(0); - out.write_all(&b2).await?; - tx.send(Ok(FetchItem::Log(format!("written")))).await.errstr()?; - let n1 = inp.read(&mut buf).await?; - tx.send(Ok(FetchItem::Log(format!("received: {} {:?}", n1, buf)))) - .await - .errstr()?; - - // Search to get cid: - let chn = b"SATCB01-DBPM220:Y2"; - b2.clear(); - b2.put_u16(0x06); - b2.put_u16((16 + chn.len()) as u16); - b2.put_u16(0x00); - b2.put_u16(0x0b); - b2.put_u32(0x71803472); - b2.put_u32(0x71803472); - b2.put_slice(chn); - out.write_all(&b2).await?; - tx.send(Ok(FetchItem::Log(format!("written")))).await.errstr()?; - let n1 = inp.read(&mut buf).await?; - tx.send(Ok(FetchItem::Log(format!("received: {} {:?}", n1, buf)))) - .await - .errstr()?; - - Ok::<_, Error>(()) - } - .then({ - move |item| async move { - match item { - Ok(_) => {} - Err(e) => { - tx2.send(Ok(FetchItem::Log(format!("Seeing error: {:?}", e)))) - .await - .errstr()?; - } - } - Ok::<_, Error>(()) - } - }), - ); - Ok(rx) -} - pub struct CaConnectOpts { pub source: String, - pub channel_name: String, + pub channels: Vec, } -pub async fn ca_connect_2(opts: CaConnectOpts) -> Result<(), Error> { - let (tx, rx) = bounded::>(16); - let tx2 = tx.clone(); - tokio::task::spawn( - async move { - let mut conn = tokio::net::TcpStream::connect(&opts.source).await?; - let (mut inp, mut out) = conn.split(); - tx.send(Ok(FetchItem::Log(format!("connected")))).await.errstr()?; - let mut buf = [0; 64]; - - let mut b2 = BytesMut::with_capacity(128); - b2.put_u16(0x00); - b2.put_u16(0); - b2.put_u16(0); - b2.put_u16(0xb); - b2.put_u32(0); - b2.put_u32(0); - out.write_all(&b2).await?; - tx.send(Ok(FetchItem::Log(format!("written")))).await.errstr()?; - let n1 = inp.read(&mut buf).await?; - tx.send(Ok(FetchItem::Log(format!("received: {} {:?}", n1, buf)))) - .await - .errstr()?; - - // Search to get cid: - let chn = opts.channel_name.as_bytes(); - b2.clear(); - b2.put_u16(0x06); - b2.put_u16((16 + chn.len()) as u16); - b2.put_u16(0x00); - b2.put_u16(0x0b); - b2.put_u32(0x71803472); - b2.put_u32(0x71803472); - b2.put_slice(chn); - out.write_all(&b2).await?; - tx.send(Ok(FetchItem::Log(format!("written")))).await.errstr()?; - let n1 = inp.read(&mut buf).await?; - tx.send(Ok(FetchItem::Log(format!("received: {} {:?}", n1, buf)))) - .await - .errstr()?; - - Ok::<_, Error>(()) - } - .then({ - move |item| async move { - match item { - Ok(_) => {} - Err(e) => { - tx2.send(Ok(FetchItem::Log(format!("Seeing error: {:?}", e)))) - .await - .errstr()?; - } - } - Ok::<_, Error>(()) - } - }), - ); - loop { - match rx.recv().await { - Ok(item) => { - info!("got item: {item:?}"); +pub async fn ca_connect_3(opts: CaConnectOpts) -> Result<(), Error> { + debug!("ca_connect_3"); + let tcp = TcpStream::connect(&opts.source).await?; + let mut conn = CaConn::new(tcp); + for c in opts.channels { + conn.channel_add(c); + } + while let Some(item) = conn.next().await { + match item { + Ok(k) => { + info!("CaConn gives item: {k:?}"); } Err(e) => { - error!("can no longer receive from queue: {e:?}"); + error!("CaConn gives error: {e:?}"); break; } } } Ok(()) } - -pub async fn ca_connect_3(opts: CaConnectOpts) -> Result<(), Error> { - let tcp = TcpStream::connect(&opts.source).await?; - let mut conn = CaConn::new(tcp, opts.channel_name); - while let Some(_item) = conn.next().await {} - Ok(()) -} - -enum CaConnState { - Init, - Listen, - Done, -} - -struct CaConn { - state: CaConnState, - proto: CaProto, - ioid: u32, - // tmp: simply try to communicate with a given channel: - channel: String, -} - -impl CaConn { - fn new(tcp: TcpStream, channel: String) -> Self { - Self { - state: CaConnState::Init, - proto: CaProto::new(tcp), - ioid: 0, - channel, - } - } -} - -const CA_PROTO_VERSION: u16 = 13; - -impl Stream for CaConn { - type Item = Result<(), Error>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - loop { - break match &self.state { - CaConnState::Init => { - let msg = CaMsg { - ty: CaMsgTy::Version(CA_PROTO_VERSION), - }; - self.proto.out.push_back(msg); - let msg = CaMsg { - ty: CaMsgTy::ClientName, - }; - self.proto.out.push_back(msg); - let msg = CaMsg { ty: CaMsgTy::HostName }; - self.proto.out.push_back(msg); - self.state = CaConnState::Listen; - continue; - } - CaConnState::Listen => match self.proto.poll_next_unpin(cx) { - Ready(Some(k)) => { - match k { - Ok(k) => match k { - CaItem::Empty => Ready(Some(Ok(()))), - CaItem::Msg(msg) => match msg.ty { - CaMsgTy::Version(n) => { - if n < 12 || n > 13 { - error!("See some unexpected version {n} channel search may not work."); - Ready(Some(Ok(()))) - } else { - info!("Received peer version {n}"); - if true { - let msg = CaMsg { - ty: CaMsgTy::Search(Search { - id: 501, - channel: self.channel.clone(), - }), - }; - self.proto.out.push_back(msg); - } - let msg = CaMsg { - ty: CaMsgTy::CreateChan(CreateChan { - cid: 1700, - channel: self.channel.clone(), - }), - }; - self.proto.out.push_back(msg); - Ready(Some(Ok(()))) - } - } - CaMsgTy::CreateChanRes(k) => { - info!("Channel created"); - let msg = CaMsg { - ty: CaMsgTy::ReadNotify(ReadNotify { - sid: k.sid, - data_type: k.data_type, - data_count: k.data_count, - ioid: self.ioid, - }), - }; - self.ioid += 1; - self.proto.out.push_back(msg); - let msg = CaMsg { - ty: CaMsgTy::EventAdd(EventAdd { - sid: k.sid, - data_type: k.data_type, - data_count: k.data_count, - subid: self.ioid, - }), - }; - self.ioid += 1; - self.proto.out.push_back(msg); - Ready(Some(Ok(()))) - } - CaMsgTy::ReadNotify(_) => { - // TODO create a generic container for the data updates. - Ready(Some(Ok(()))) - } - CaMsgTy::EventAddRes(_) => { - // TODO create a generic container for the data updates. - Ready(Some(Ok(()))) - } - CaMsgTy::SearchRes(k) => { - let a = k.addr.to_be_bytes(); - let addr = format!("{}.{}.{}.{}:{}", a[0], a[1], a[2], a[3], k.tcp_port); - info!("Search result indicates server address: {addr}"); - Ready(Some(Ok(()))) - } - k => { - info!("Got some other unhandled message: {k:?}"); - Ready(Some(Ok(()))) - } - }, - }, - Err(e) => { - error!("got error item from CaProto {e:?}"); - Ready(Some(Ok(()))) - } - } - } - Ready(None) => { - self.state = CaConnState::Done; - continue; - } - Pending => Pending, - }, - CaConnState::Done => Ready(None), - }; - } - } -} - -#[derive(Debug)] -enum CaItem { - Empty, - Msg(CaMsg), -} - -impl CaItem { - fn empty() -> Self { - CaItem::Empty - } -} - -#[derive(Debug)] -struct Search { - id: u32, - channel: String, -} - -#[derive(Debug)] -struct SearchRes { - addr: u32, - tcp_port: u16, - sid: u32, -} - -#[derive(Debug)] -struct ClientNameRes { - name: String, -} - -#[derive(Debug)] -struct CreateChan { - cid: u32, - channel: String, -} - -#[derive(Debug)] -struct CreateChanRes { - data_type: u16, - data_count: u16, - cid: u32, - sid: u32, -} - -#[derive(Debug)] -struct AccessRightsRes { - cid: u32, - rights: u32, -} - -#[derive(Debug)] -struct EventAdd { - data_type: u16, - data_count: u16, - sid: u32, - subid: u32, -} - -#[derive(Debug)] -struct EventAddRes { - data_type: u16, - data_count: u16, - status: u32, - subid: u32, -} - -#[derive(Debug)] -struct ReadNotify { - data_type: u16, - data_count: u16, - sid: u32, - ioid: u32, -} - -#[derive(Debug)] -struct ReadNotifyRes { - data_type: u16, - data_count: u16, - sid: u32, - ioid: u32, -} - -#[derive(Debug)] -enum CaScalarType { - I8, - I16, - I32, - F32, - F64, -} - -impl CaScalarType { - fn from_ca_u16(k: u16) -> Result { - use CaScalarType::*; - let ret = match k { - 4 => I8, - 1 => I16, - 5 => I32, - 2 => F32, - 6 => F64, - k => return Err(Error::with_msg_no_trace(format!("bad dbr type id: {k}"))), - }; - Ok(ret) - } -} - -#[derive(Debug)] -struct CaPayloadType { - #[allow(unused)] - scalar_type: CaScalarType, -} - -#[derive(Debug)] -enum CaMsgTy { - Version(u16), - ClientName, - ClientNameRes(ClientNameRes), - HostName, - Search(Search), - SearchRes(SearchRes), - CreateChan(CreateChan), - CreateChanRes(CreateChanRes), - AccessRightsRes(AccessRightsRes), - EventAdd(EventAdd), - EventAddRes(EventAddRes), - ReadNotify(ReadNotify), - ReadNotifyRes(ReadNotifyRes), -} - -impl CaMsgTy { - fn cmdid(&self) -> u16 { - use CaMsgTy::*; - match self { - Version(_) => 0, - ClientName => 20, - ClientNameRes(_) => 20, - HostName => 21, - Search(_) => 6, - SearchRes(_) => 6, - CreateChan(_) => 18, - CreateChanRes(_) => 18, - AccessRightsRes(_) => 22, - EventAdd(_) => 1, - EventAddRes(_) => 1, - ReadNotify(_) => 15, - ReadNotifyRes(_) => 15, - } - } - - fn len(&self) -> usize { - 16 + self.payload_len() - } - - fn payload_len(&self) -> usize { - use CaMsgTy::*; - match self { - Version(_) => 0, - ClientName => 8, - ClientNameRes(x) => (7 + x.name.len()) / 8 * 8, - HostName => 8, - Search(s) => (7 + s.channel.len()) / 8 * 8, - SearchRes(_) => 8, - CreateChan(x) => (7 + x.channel.len()) / 8 * 8, - CreateChanRes(_) => 0, - AccessRightsRes(_) => 0, - EventAdd(_) => 16, - EventAddRes(_) => { - error!("should not attempt to serialize the response again"); - panic!(); - } - ReadNotify(_) => 0, - ReadNotifyRes(_) => { - error!("should not attempt to serialize the response again"); - panic!(); - } - } - } - - fn data_type(&self) -> u16 { - use CaMsgTy::*; - match self { - Version(n) => *n, - ClientName => 0, - ClientNameRes(_) => 0, - HostName => 0, - Search(_) => { - // Reply-flag - 1 - } - SearchRes(x) => x.tcp_port, - CreateChan(_) => 0, - CreateChanRes(x) => x.data_type, - AccessRightsRes(_) => 0, - EventAdd(x) => x.data_type, - EventAddRes(x) => x.data_type, - ReadNotify(x) => x.data_type, - ReadNotifyRes(x) => x.data_type, - } - } - - fn data_count(&self) -> u16 { - use CaMsgTy::*; - match self { - Version(_) => 0, - ClientName => 0, - ClientNameRes(_) => 0, - HostName => 0, - Search(_) => CA_PROTO_VERSION, - SearchRes(_) => 0, - CreateChan(_) => 0, - CreateChanRes(x) => x.data_count, - AccessRightsRes(_) => 0, - EventAdd(x) => x.data_count, - EventAddRes(x) => x.data_count, - ReadNotify(x) => x.data_count, - ReadNotifyRes(x) => x.data_count, - } - } - - fn param1(&self) -> u32 { - use CaMsgTy::*; - match self { - Version(_) => 0, - ClientName => 0, - ClientNameRes(_) => 0, - HostName => 0, - Search(e) => e.id, - SearchRes(x) => x.addr, - CreateChan(x) => x.cid, - CreateChanRes(x) => x.cid, - AccessRightsRes(x) => x.cid, - EventAdd(x) => x.sid, - EventAddRes(x) => x.status, - ReadNotify(x) => x.sid, - ReadNotifyRes(x) => x.sid, - } - } - - fn param2(&self) -> u32 { - use CaMsgTy::*; - match self { - Version(_) => 0, - ClientName => 0, - ClientNameRes(_) => 0, - HostName => 0, - Search(e) => e.id, - SearchRes(x) => x.sid, - CreateChan(_) => CA_PROTO_VERSION as _, - CreateChanRes(x) => x.sid, - AccessRightsRes(x) => x.rights, - EventAdd(x) => x.subid, - EventAddRes(x) => x.subid, - ReadNotify(x) => x.ioid, - ReadNotifyRes(x) => x.ioid, - } - } - - fn place_payload_into(&self, buf: &mut [u8]) { - use CaMsgTy::*; - match self { - Version(_) => {} - ClientName => { - // TODO allow variable client name. Null-extend always to 8 byte align. - buf.copy_from_slice(b"SA10\0\0\0\0"); - } - ClientNameRes(_) => { - error!("should not attempt to write ClientNameRes"); - panic!(); - } - HostName => { - // TODO allow variable host name. Null-extend always to 8 byte align. - buf.copy_from_slice(b"SA10\0\0\0\0"); - } - Search(e) => { - for x in &mut buf[..] { - *x = 0; - } - let d = e.channel.as_bytes(); - if buf.len() < d.len() + 1 { - error!("bad buffer given"); - panic!(); - } - unsafe { std::ptr::copy(&d[0] as _, &mut buf[0] as _, d.len()) }; - } - SearchRes(_) => { - error!("should not attempt to write SearchRes"); - panic!(); - } - CreateChan(x) => { - for x in &mut buf[..] { - *x = 0; - } - let d = x.channel.as_bytes(); - if buf.len() < d.len() + 1 { - error!("bad buffer given"); - panic!(); - } - unsafe { std::ptr::copy(&d[0] as _, &mut buf[0] as _, d.len()) }; - } - CreateChanRes(_) => {} - AccessRightsRes(_) => {} - EventAdd(_) => { - // TODO allow to customize the mask. Test if it works. - buf.copy_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0, 0]); - } - EventAddRes(_) => {} - ReadNotify(_) => {} - ReadNotifyRes(_) => {} - } - } -} - -#[derive(Debug)] -struct CaMsg { - ty: CaMsgTy, -} - -impl CaMsg { - fn len(&self) -> usize { - self.ty.len() - } - - fn place_into(&self, buf: &mut [u8]) { - info!("place_into given {} bytes buffer", buf.len()); - if self.ty.payload_len() > 0x4000 - 16 { - error!("TODO emit for larger payloads"); - panic!(); - } else { - let t = self.ty.cmdid().to_be_bytes(); - buf[0] = t[0]; - buf[1] = t[1]; - let t = (self.ty.payload_len() as u16).to_be_bytes(); - buf[2] = t[0]; - buf[3] = t[1]; - let t = self.ty.data_type().to_be_bytes(); - buf[4] = t[0]; - buf[5] = t[1]; - let t = self.ty.data_count().to_be_bytes(); - buf[6] = t[0]; - buf[7] = t[1]; - let t = self.ty.param1().to_be_bytes(); - buf[8] = t[0]; - buf[9] = t[1]; - buf[10] = t[2]; - buf[11] = t[3]; - let t = self.ty.param2().to_be_bytes(); - buf[12] = t[0]; - buf[13] = t[1]; - buf[14] = t[2]; - buf[15] = t[3]; - self.ty.place_payload_into(&mut buf[16..]); - } - } - - fn from_proto_infos(hi: &HeadInfo, payload: &[u8]) -> Result { - let msg = match hi.cmdid { - 0 => CaMsg { - ty: CaMsgTy::Version(hi.data_count), - }, - 20 => { - let name = std::ffi::CString::new(payload) - .map(|s| s.into_string().unwrap_or_else(|e| format!("{e:?}"))) - .unwrap_or_else(|e| format!("{e:?}")); - CaMsg { - ty: CaMsgTy::ClientNameRes(ClientNameRes { name }), - } - } - // TODO make response type for host name: - 21 => CaMsg { ty: CaMsgTy::HostName }, - 6 => { - if hi.payload_size != 8 { - warn!("protocol error: search result is expected with fixed payload size 8"); - } - if hi.data_count != 0 { - warn!("protocol error: search result is expected with data count 0"); - } - CaMsg { - ty: CaMsgTy::SearchRes(SearchRes { - tcp_port: hi.data_type, - addr: hi.param1, - sid: hi.param2, - }), - } - } - 18 => { - CaMsg { - // TODO use different structs for request and response: - ty: CaMsgTy::CreateChanRes(CreateChanRes { - data_type: hi.data_type, - data_count: hi.data_count, - cid: hi.param1, - sid: hi.param2, - }), - } - } - 22 => { - CaMsg { - // TODO use different structs for request and response: - ty: CaMsgTy::AccessRightsRes(AccessRightsRes { - cid: hi.param1, - rights: hi.param2, - }), - } - } - 1 => { - let ca_st = CaScalarType::from_ca_u16(hi.data_type)?; - match ca_st { - CaScalarType::F64 => { - // TODO handle wrong payload sizer in more distinct way. - let v = f64::from_be_bytes(payload.try_into()?); - info!("Payload as f64: {v}"); - } - _ => { - warn!("TODO handle {ca_st:?}"); - } - } - let d = EventAddRes { - data_type: hi.data_type, - data_count: hi.data_count, - status: hi.param1, - subid: hi.param2, - }; - CaMsg { - ty: CaMsgTy::EventAddRes(d), - } - } - 15 => { - if payload.len() == 8 { - let v = u64::from_be_bytes(payload.try_into()?); - info!("Payload as u64: {v}"); - let v = i64::from_be_bytes(payload.try_into()?); - info!("Payload as i64: {v}"); - let v = f64::from_be_bytes(payload.try_into()?); - info!("Payload as f64: {v}"); - } else { - info!( - "payload string {:?} payload {:?}", - String::from_utf8_lossy(&payload[..payload.len().min(12)]), - &payload[..payload.len().min(12)], - ); - } - CaMsg { - // TODO use different structs for request and response: - ty: CaMsgTy::ReadNotifyRes(ReadNotifyRes { - data_type: hi.data_type, - data_count: hi.data_count, - sid: hi.param1, - ioid: hi.param2, - }), - } - } - x => return Err(Error::with_msg_no_trace(format!("unsupported ca command {}", x))), - }; - Ok(msg) - } -} - -#[derive(Clone, Debug)] -struct HeadInfo { - cmdid: u16, - payload_size: u16, - data_type: u16, - data_count: u16, - param1: u32, - param2: u32, -} - -enum CaState { - StdHead, - ExtHead(HeadInfo), - Payload(HeadInfo), - Done, -} - -impl CaState { - fn need_min(&self) -> usize { - use CaState::*; - match self { - StdHead => 16, - ExtHead(_) => 8, - Payload(k) => k.payload_size as _, - Done => 123, - } - } -} - -struct CaProto { - tcp: TcpStream, - state: CaState, - buf: NetBuf, - outbuf: NetBuf, - out: VecDeque, -} - -impl CaProto { - fn new(tcp: TcpStream) -> Self { - Self { - tcp, - state: CaState::StdHead, - buf: NetBuf::new(1024 * 128), - outbuf: NetBuf::new(1024 * 128), - out: VecDeque::new(), - } - } - - fn inpbuf_conn(&mut self, need_min: usize) -> (&mut TcpStream, ReadBuf) { - (&mut self.tcp, self.buf.read_buf_for_fill(need_min)) - } - - fn outbuf_conn(&mut self) -> (&mut TcpStream, &[u8]) { - (&mut self.tcp, self.outbuf.data()) - } - - fn out_msg_buf(&mut self) -> Option<(&CaMsg, &mut [u8])> { - if let Some(item) = self.out.front() { - info!("attempt to serialize outgoing message msg {:?}", item); - if let Ok(buf) = self.outbuf.write_buf(item.len()) { - Some((item, buf)) - } else { - error!("output buffer too small for message"); - None - } - } else { - None - } - } - - fn attempt_output(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - let (w, b) = self.outbuf_conn(); - pin_mut!(w); - match w.poll_write(cx, b) { - Ready(k) => match k { - Ok(k) => { - info!("sent {} bytes {:?}", k, &self.outbuf.data()[..k]); - match self.outbuf.adv(k) { - Ok(()) => Ready(Ok(())), - Err(e) => { - error!("advance error {:?}", e); - Ready(Err(e)) - } - } - } - Err(e) => { - error!("output write error {:?}", e); - Ready(Err(e.into())) - } - }, - Pending => Pending, - } - } - - fn loop_body(mut self: Pin<&mut Self>, cx: &mut Context) -> Result>, Error> { - use Poll::*; - if self.out.len() != 0 || self.outbuf.len() != 0 { - info!("loop_body out {} outbuf {}", self.out.len(), self.outbuf.len()); - } - let output_res_1: Option> = 'll1: loop { - if self.out.len() == 0 { - break None; - } - while let Some((msg, buf)) = self.out_msg_buf() { - msg.place_into(buf); - self.out.pop_front(); - } - while self.outbuf.len() > 0 { - match Self::attempt_output(self.as_mut(), cx)? { - Ready(()) => {} - Pending => { - break 'll1 Some(Pending); - } - } - } - }; - let output_res_2: Option> = if let Some(Pending) = output_res_1 { - Some(Pending) - } else { - loop { - if self.outbuf.len() == 0 { - break None; - } - match Self::attempt_output(self.as_mut(), cx)? { - Ready(()) => {} - Pending => break Some(Pending), - } - } - }; - let need_min = self.state.need_min(); - let read_res = { - if self.buf.cap() < need_min { - self.state = CaState::Done; - let e = Error::with_msg_no_trace(format!( - "buffer too small for need_min {} {}", - self.buf.cap(), - self.state.need_min() - )); - Err(e) - } else if self.buf.len() < need_min { - let (w, mut rbuf) = self.inpbuf_conn(need_min); - pin_mut!(w); - match w.poll_read(cx, &mut rbuf) { - Ready(k) => match k { - Ok(()) => { - let nf = rbuf.filled().len(); - if nf == 0 { - info!("EOF"); - // TODO may need another state, if not yet done when input is EOF. - self.state = CaState::Done; - Ok(Some(Ready(CaItem::empty()))) - } else { - if false { - info!("received {} bytes", rbuf.filled().len()); - let t = rbuf.filled().len().min(32); - info!("received data {:?}", &rbuf.filled()[0..t]); - } - match self.buf.wadv(nf) { - Ok(()) => Ok(Some(Ready(CaItem::empty()))), - Err(e) => { - error!("netbuf wadv fail nf {nf}"); - Err(e) - } - } - } - } - Err(e) => Err(e.into()), - }, - Pending => Ok(Some(Pending)), - } - } else { - Ok(None) - } - }?; - let parse_res: Option = self.parse_item()?; - match (output_res_2, read_res, parse_res) { - (_, _, Some(item)) => Ok(Some(Ready(item))), - (Some(Pending), _, _) => Ok(Some(Pending)), - (_, Some(Pending), _) => Ok(Some(Pending)), - (_, None, None) => { - // TODO constrain how often we can go to this case consecutively. - Ok(None) - } - (_, Some(_), None) => Ok(None), - } - } - - fn parse_item(&mut self) -> Result, Error> { - loop { - if self.buf.len() < self.state.need_min() { - break Ok(None); - } - break match &self.state { - CaState::StdHead => { - let command = self.buf.read_u16_be()?; - let payload_size = self.buf.read_u16_be()?; - let data_type = self.buf.read_u16_be()?; - let data_count = self.buf.read_u16_be()?; - let param1 = self.buf.read_u32_be()?; - let param2 = self.buf.read_u32_be()?; - let hi = HeadInfo { - cmdid: command, - payload_size, - data_type, - data_count, - param1, - param2, - }; - if hi.cmdid == 6 || hi.cmdid > 26 || hi.data_type > 10 || hi.payload_size > 8 { - warn!("StdHead {hi:?}"); - } - if payload_size == 0xffff && data_count == 0 { - self.state = CaState::ExtHead(hi); - Ok(None) - } else { - if payload_size == 0 { - self.state = CaState::StdHead; - let msg = CaMsg::from_proto_infos(&hi, &[])?; - Ok(Some(CaItem::Msg(msg))) - } else { - self.state = CaState::Payload(hi); - Ok(None) - } - } - } - CaState::ExtHead(hi) => { - let payload_size = self.buf.read_u32_be()?; - let data_count = self.buf.read_u32_be()?; - warn!("ExtHead payload_size {payload_size} data_count {data_count}"); - if payload_size == 0 { - let msg = CaMsg::from_proto_infos(hi, &[])?; - self.state = CaState::StdHead; - Ok(Some(CaItem::Msg(msg))) - } else { - self.state = CaState::Payload(hi.clone()); - Ok(None) - } - } - CaState::Payload(hi) => { - let g = self.buf.read_bytes(hi.payload_size as _)?; - let msg = CaMsg::from_proto_infos(hi, g)?; - self.state = CaState::StdHead; - Ok(Some(CaItem::Msg(msg))) - } - CaState::Done => Err(Error::with_msg_no_trace("attempt to parse in Done state")), - }; - } - } -} - -impl Stream for CaProto { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - if let CaState::Done = self.state { - return Ready(None); - } else { - loop { - break match Self::loop_body(self.as_mut(), cx) { - Ok(Some(Ready(k))) => Ready(Some(Ok(k))), - Ok(Some(Pending)) => Pending, - Ok(None) => continue, - Err(e) => Ready(Some(Err(e))), - }; - } - } - } -} diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs new file mode 100644 index 0000000..fcf8546 --- /dev/null +++ b/netfetch/src/ca/conn.rs @@ -0,0 +1,322 @@ +use super::proto::{CaItem, CaMsg, CaMsgTy, CaProto}; +use crate::ca::proto::{CreateChan, EventAdd, ReadNotify}; +use err::Error; +use futures_util::{Stream, StreamExt}; +use log::*; +use std::collections::BTreeMap; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Instant; +use tokio::net::TcpStream; + +#[derive(Debug)] +enum ChannelError { + NoSuccess, +} + +#[derive(Debug)] +struct EventedState { + ts_last: Instant, +} + +#[derive(Debug)] +enum MonitoringState { + AddingEvent, + Evented(EventedState), + Reading, + Read, + Muted, +} + +#[derive(Debug)] +struct CreatedState { + cid: u32, + sid: u32, + ts_created: Instant, + state: MonitoringState, +} + +#[derive(Debug)] +enum ChannelState { + NotCreated, + Creating { cid: u32, ts_beg: Instant }, + Created(CreatedState), + Error(ChannelError), +} + +enum CaConnState { + Init, + Listen, + PeerReady, + Done, +} + +struct IdStore { + next: u32, +} + +impl IdStore { + fn new() -> Self { + Self { next: 0 } + } + + fn next(&mut self) -> u32 { + let ret = self.next; + self.next += 1; + ret + } +} + +pub struct CaConn { + state: CaConnState, + proto: CaProto, + cid_store: IdStore, + ioid_store: IdStore, + subid_store: IdStore, + channels: BTreeMap, + cid_by_name: BTreeMap, + cid_by_subid: BTreeMap, + name_by_cid: BTreeMap, + poll_count: usize, +} + +impl CaConn { + pub fn new(tcp: TcpStream) -> Self { + Self { + state: CaConnState::Init, + proto: CaProto::new(tcp), + cid_store: IdStore::new(), + ioid_store: IdStore::new(), + subid_store: IdStore::new(), + channels: BTreeMap::new(), + cid_by_name: BTreeMap::new(), + cid_by_subid: BTreeMap::new(), + name_by_cid: BTreeMap::new(), + poll_count: 0, + } + } + + pub fn channel_add(&mut self, channel: String) { + let cid = self.cid_by_name(&channel); + if self.channels.contains_key(&cid) { + } else { + self.channels.insert(cid, ChannelState::NotCreated); + } + } + + fn cid_by_name(&mut self, name: &str) -> u32 { + if let Some(cid) = self.cid_by_name.get(name) { + *cid + } else { + let cid = self.cid_store.next(); + self.cid_by_name.insert(name.into(), cid); + self.name_by_cid.insert(cid, name.into()); + cid + } + } + + fn name_by_cid(&self, cid: u32) -> Option<&str> { + self.name_by_cid.get(&cid).map(|x| x.as_str()) + } +} + +impl Stream for CaConn { + type Item = Result<(), Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + self.poll_count += 1; + if self.poll_count > 30 { + error!("TODO CaConn reached poll_count limit"); + return Ready(None); + } + loop { + break match &self.state { + CaConnState::Init => { + let msg = CaMsg { ty: CaMsgTy::Version }; + self.proto.push_out(msg); + let msg = CaMsg { + ty: CaMsgTy::ClientName, + }; + self.proto.push_out(msg); + let msg = CaMsg { ty: CaMsgTy::HostName }; + self.proto.push_out(msg); + self.state = CaConnState::Listen; + continue; + } + CaConnState::Listen => match self.proto.poll_next_unpin(cx) { + Ready(Some(k)) => match k { + Ok(k) => match k { + CaItem::Empty => { + info!("CaItem::Empty"); + Ready(Some(Ok(()))) + } + CaItem::Msg(msg) => match msg.ty { + CaMsgTy::VersionRes(n) => { + if n < 12 || n > 13 { + error!("See some unexpected version {n} channel search may not work."); + Ready(Some(Ok(()))) + } else { + info!("Received peer version {n}"); + self.state = CaConnState::PeerReady; + continue; + } + } + k => { + warn!("Got some other unhandled message: {k:?}"); + Ready(Some(Ok(()))) + } + }, + }, + Err(e) => { + error!("got error item from CaProto {e:?}"); + Ready(Some(Ok(()))) + } + }, + Ready(None) => { + warn!("CaProto is done"); + self.state = CaConnState::Done; + continue; + } + Pending => Pending, + }, + CaConnState::PeerReady => { + // TODO unify with Listen state where protocol gets polled as well. + let mut msgs_tmp = vec![]; + // TODO profile, efficient enough? + let keys: Vec = self.channels.keys().map(|x| *x).collect(); + for cid in keys { + match self.channels[&cid] { + ChannelState::NotCreated => { + let name = self + .name_by_cid(cid) + .ok_or_else(|| Error::with_msg_no_trace("name for cid not known")); + let name = match name { + Ok(k) => k, + Err(e) => return Ready(Some(Err(e))), + }; + info!("Sending CreateChan for {}", name); + let msg = CaMsg { + ty: CaMsgTy::CreateChan(CreateChan { + cid, + channel: name.into(), + }), + }; + msgs_tmp.push(msg); + // TODO handle not-found error: + let ch_s = self.channels.get_mut(&cid).unwrap(); + *ch_s = ChannelState::Creating { + cid, + ts_beg: Instant::now(), + }; + } + _ => {} + } + } + let mut do_wake_again = false; + if msgs_tmp.len() > 0 { + info!("msgs_tmp.len() {}", msgs_tmp.len()); + do_wake_again = true; + } + // TODO be careful to not overload outgoing message queue. + for msg in msgs_tmp { + self.proto.push_out(msg); + } + let res = match self.proto.poll_next_unpin(cx) { + Ready(Some(Ok(k))) => { + match k { + CaItem::Msg(k) => match k.ty { + CaMsgTy::SearchRes(k) => { + let a = k.addr.to_be_bytes(); + let addr = format!("{}.{}.{}.{}:{}", a[0], a[1], a[2], a[3], k.tcp_port); + info!("Search result indicates server address: {addr}"); + } + CaMsgTy::CreateChanRes(k) => { + // TODO handle cid-not-found which can also indicate peer error. + let cid = k.cid; + let name = self.name_by_cid(cid); + info!("Channel created for {name:?} now register for events"); + let subid = self.subid_store.next(); + self.cid_by_subid.insert(subid, cid); + let msg = CaMsg { + ty: CaMsgTy::EventAdd(EventAdd { + sid: k.sid, + data_type: k.data_type, + data_count: k.data_count, + subid, + }), + }; + self.proto.push_out(msg); + do_wake_again = true; + // TODO handle not-found error: + let ch_s = self.channels.get_mut(&k.cid).unwrap(); + *ch_s = ChannelState::Created(CreatedState { + cid: k.cid, + sid: k.sid, + ts_created: Instant::now(), + state: MonitoringState::AddingEvent, + }); + info!( + "Channel is created cid {} sid {} name {}", + k.cid, k.sid, self.name_by_cid[&k.cid] + ); + } + CaMsgTy::EventAddRes(k) => { + // TODO handle subid-not-found which can also be peer error: + let cid = *self.cid_by_subid.get(&k.subid).unwrap(); + // TODO get rid of the string clone when I don't want the log output any longer: + let name: String = self.name_by_cid(cid).unwrap().into(); + // TODO handle not-found error: + let ch_s = self.channels.get_mut(&cid).unwrap(); + match ch_s { + ChannelState::Created(st) => { + match st.state { + MonitoringState::AddingEvent => { + info!("Confirmation {name} is subscribed."); + // TODO get ts from faster common source: + st.state = MonitoringState::Evented(EventedState { + ts_last: Instant::now(), + }); + } + MonitoringState::Evented(ref mut st) => { + // TODO get ts from faster common source: + st.ts_last = Instant::now(); + } + _ => { + warn!("bad state? not always, could be late message."); + } + } + } + _ => { + warn!("unexpected state: EventAddRes while having {ch_s:?}"); + } + } + } + _ => {} + }, + _ => {} + } + Ready(Some(Ok(()))) + } + Ready(Some(Err(e))) => { + error!("CaProto yields error: {e:?}"); + Ready(Some(Err(e))) + } + Ready(None) => { + warn!("CaProto is done"); + self.state = CaConnState::Done; + Ready(Some(Ok(()))) + } + Pending => Pending, + }; + if do_wake_again { + info!("do_wake_again"); + cx.waker().wake_by_ref(); + } + res + } + CaConnState::Done => Ready(None), + }; + } + } +} diff --git a/netfetch/src/ca/monitor.rs b/netfetch/src/ca/monitor.rs new file mode 100644 index 0000000..e69de29 diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs new file mode 100644 index 0000000..2cdc437 --- /dev/null +++ b/netfetch/src/ca/proto.rs @@ -0,0 +1,745 @@ +use crate::netbuf::NetBuf; +use err::Error; +use futures_util::{pin_mut, Stream}; +use log::*; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::net::TcpStream; + +const CA_PROTO_VERSION: u16 = 13; + +#[derive(Debug)] +pub struct Search { + pub id: u32, + pub channel: String, +} + +#[derive(Debug)] +pub struct SearchRes { + pub addr: u32, + pub tcp_port: u16, + pub sid: u32, +} + +#[derive(Debug)] +pub struct ClientNameRes { + pub name: String, +} + +#[derive(Debug)] +pub struct CreateChan { + pub cid: u32, + pub channel: String, +} + +#[derive(Debug)] +pub struct CreateChanRes { + pub data_type: u16, + pub data_count: u16, + pub cid: u32, + pub sid: u32, +} + +#[derive(Debug)] +pub struct AccessRightsRes { + pub cid: u32, + pub rights: u32, +} + +#[derive(Debug)] +pub struct EventAdd { + pub data_type: u16, + pub data_count: u16, + pub sid: u32, + pub subid: u32, +} + +#[derive(Debug)] +pub struct EventAddRes { + pub data_type: u16, + pub data_count: u16, + pub status: u32, + pub subid: u32, +} + +#[derive(Debug)] +pub struct ReadNotify { + pub data_type: u16, + pub data_count: u16, + pub sid: u32, + pub ioid: u32, +} + +#[derive(Debug)] +pub struct ReadNotifyRes { + pub data_type: u16, + pub data_count: u16, + pub sid: u32, + pub ioid: u32, +} + +#[derive(Debug)] +enum CaScalarType { + I8, + I16, + I32, + F32, + F64, +} + +impl CaScalarType { + fn from_ca_u16(k: u16) -> Result { + use CaScalarType::*; + let ret = match k { + 4 => I8, + 1 => I16, + 5 => I32, + 2 => F32, + 6 => F64, + k => return Err(Error::with_msg_no_trace(format!("bad dbr type id: {k}"))), + }; + Ok(ret) + } +} + +#[derive(Debug)] +pub enum CaMsgTy { + Version, + VersionRes(u16), + ClientName, + ClientNameRes(ClientNameRes), + HostName, + Search(Search), + SearchRes(SearchRes), + CreateChan(CreateChan), + CreateChanRes(CreateChanRes), + AccessRightsRes(AccessRightsRes), + EventAdd(EventAdd), + EventAddRes(EventAddRes), + ReadNotify(ReadNotify), + ReadNotifyRes(ReadNotifyRes), +} + +impl CaMsgTy { + fn cmdid(&self) -> u16 { + use CaMsgTy::*; + match self { + Version => 0, + VersionRes(_) => 0, + ClientName => 20, + ClientNameRes(_) => 20, + HostName => 21, + Search(_) => 6, + SearchRes(_) => 6, + CreateChan(_) => 18, + CreateChanRes(_) => 18, + AccessRightsRes(_) => 22, + EventAdd(_) => 1, + EventAddRes(_) => 1, + ReadNotify(_) => 15, + ReadNotifyRes(_) => 15, + } + } + + fn len(&self) -> usize { + 16 + self.payload_len() + } + + fn payload_len(&self) -> usize { + use CaMsgTy::*; + match self { + Version => 0, + VersionRes(_) => 0, + ClientName => 8, + ClientNameRes(x) => (7 + x.name.len()) / 8 * 8, + HostName => 8, + Search(s) => (7 + s.channel.len()) / 8 * 8, + SearchRes(_) => 8, + CreateChan(x) => (7 + x.channel.len()) / 8 * 8, + CreateChanRes(_) => 0, + AccessRightsRes(_) => 0, + EventAdd(_) => 16, + EventAddRes(_) => { + error!("should not attempt to serialize the response again"); + panic!(); + } + ReadNotify(_) => 0, + ReadNotifyRes(_) => { + error!("should not attempt to serialize the response again"); + panic!(); + } + } + } + + fn data_type(&self) -> u16 { + use CaMsgTy::*; + match self { + Version => CA_PROTO_VERSION, + VersionRes(n) => *n, + ClientName => 0, + ClientNameRes(_) => 0, + HostName => 0, + Search(_) => { + // Reply-flag + 1 + } + SearchRes(x) => x.tcp_port, + CreateChan(_) => 0, + CreateChanRes(x) => x.data_type, + AccessRightsRes(_) => 0, + EventAdd(x) => x.data_type, + EventAddRes(x) => x.data_type, + ReadNotify(x) => x.data_type, + ReadNotifyRes(x) => x.data_type, + } + } + + fn data_count(&self) -> u16 { + use CaMsgTy::*; + match self { + Version => 0, + VersionRes(_) => 0, + ClientName => 0, + ClientNameRes(_) => 0, + HostName => 0, + Search(_) => CA_PROTO_VERSION, + SearchRes(_) => 0, + CreateChan(_) => 0, + CreateChanRes(x) => x.data_count, + AccessRightsRes(_) => 0, + EventAdd(x) => x.data_count, + EventAddRes(x) => x.data_count, + ReadNotify(x) => x.data_count, + ReadNotifyRes(x) => x.data_count, + } + } + + fn param1(&self) -> u32 { + use CaMsgTy::*; + match self { + Version => 0, + VersionRes(_) => 0, + ClientName => 0, + ClientNameRes(_) => 0, + HostName => 0, + Search(e) => e.id, + SearchRes(x) => x.addr, + CreateChan(x) => x.cid, + CreateChanRes(x) => x.cid, + AccessRightsRes(x) => x.cid, + EventAdd(x) => x.sid, + EventAddRes(x) => x.status, + ReadNotify(x) => x.sid, + ReadNotifyRes(x) => x.sid, + } + } + + fn param2(&self) -> u32 { + use CaMsgTy::*; + match self { + Version => 0, + VersionRes(_) => 0, + ClientName => 0, + ClientNameRes(_) => 0, + HostName => 0, + Search(e) => e.id, + SearchRes(x) => x.sid, + CreateChan(_) => CA_PROTO_VERSION as _, + CreateChanRes(x) => x.sid, + AccessRightsRes(x) => x.rights, + EventAdd(x) => x.subid, + EventAddRes(x) => x.subid, + ReadNotify(x) => x.ioid, + ReadNotifyRes(x) => x.ioid, + } + } + + fn place_payload_into(&self, buf: &mut [u8]) { + use CaMsgTy::*; + match self { + Version => {} + VersionRes(_) => {} + ClientName => { + // TODO allow variable client name. Null-extend always to 8 byte align. + buf.copy_from_slice(b"SA10\0\0\0\0"); + } + ClientNameRes(_) => { + error!("should not attempt to write ClientNameRes"); + panic!(); + } + HostName => { + // TODO allow variable host name. Null-extend always to 8 byte align. + buf.copy_from_slice(b"SA10\0\0\0\0"); + } + Search(e) => { + for x in &mut buf[..] { + *x = 0; + } + let d = e.channel.as_bytes(); + if buf.len() < d.len() + 1 { + error!("bad buffer given"); + panic!(); + } + unsafe { std::ptr::copy(&d[0] as _, &mut buf[0] as _, d.len()) }; + } + SearchRes(_) => { + error!("should not attempt to write SearchRes"); + panic!(); + } + CreateChan(x) => { + for x in &mut buf[..] { + *x = 0; + } + let d = x.channel.as_bytes(); + if buf.len() < d.len() + 1 { + error!("bad buffer given"); + panic!(); + } + unsafe { std::ptr::copy(&d[0] as _, &mut buf[0] as _, d.len()) }; + } + CreateChanRes(_) => {} + AccessRightsRes(_) => {} + EventAdd(_) => { + // TODO allow to customize the mask. Test if it works. + buf.copy_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0, 0]); + } + EventAddRes(_) => {} + ReadNotify(_) => {} + ReadNotifyRes(_) => {} + } + } +} + +#[derive(Debug)] +pub struct CaMsg { + pub ty: CaMsgTy, +} + +impl CaMsg { + fn len(&self) -> usize { + self.ty.len() + } + + fn place_into(&self, buf: &mut [u8]) { + info!("place_into given {} bytes buffer", buf.len()); + if self.ty.payload_len() > 0x4000 - 16 { + error!("TODO emit for larger payloads"); + panic!(); + } else { + let t = self.ty.cmdid().to_be_bytes(); + buf[0] = t[0]; + buf[1] = t[1]; + let t = (self.ty.payload_len() as u16).to_be_bytes(); + buf[2] = t[0]; + buf[3] = t[1]; + let t = self.ty.data_type().to_be_bytes(); + buf[4] = t[0]; + buf[5] = t[1]; + let t = self.ty.data_count().to_be_bytes(); + buf[6] = t[0]; + buf[7] = t[1]; + let t = self.ty.param1().to_be_bytes(); + buf[8] = t[0]; + buf[9] = t[1]; + buf[10] = t[2]; + buf[11] = t[3]; + let t = self.ty.param2().to_be_bytes(); + buf[12] = t[0]; + buf[13] = t[1]; + buf[14] = t[2]; + buf[15] = t[3]; + self.ty.place_payload_into(&mut buf[16..]); + } + } + + fn from_proto_infos(hi: &HeadInfo, payload: &[u8]) -> Result { + let msg = match hi.cmdid { + 0 => CaMsg { + ty: CaMsgTy::VersionRes(hi.data_count), + }, + 20 => { + let name = std::ffi::CString::new(payload) + .map(|s| s.into_string().unwrap_or_else(|e| format!("{e:?}"))) + .unwrap_or_else(|e| format!("{e:?}")); + CaMsg { + ty: CaMsgTy::ClientNameRes(ClientNameRes { name }), + } + } + // TODO make response type for host name: + 21 => CaMsg { ty: CaMsgTy::HostName }, + 6 => { + if hi.payload_size != 8 { + warn!("protocol error: search result is expected with fixed payload size 8"); + } + if hi.data_count != 0 { + warn!("protocol error: search result is expected with data count 0"); + } + CaMsg { + ty: CaMsgTy::SearchRes(SearchRes { + tcp_port: hi.data_type, + addr: hi.param1, + sid: hi.param2, + }), + } + } + 18 => { + CaMsg { + // TODO use different structs for request and response: + ty: CaMsgTy::CreateChanRes(CreateChanRes { + data_type: hi.data_type, + data_count: hi.data_count, + cid: hi.param1, + sid: hi.param2, + }), + } + } + 22 => { + CaMsg { + // TODO use different structs for request and response: + ty: CaMsgTy::AccessRightsRes(AccessRightsRes { + cid: hi.param1, + rights: hi.param2, + }), + } + } + 1 => { + let ca_st = CaScalarType::from_ca_u16(hi.data_type)?; + match ca_st { + CaScalarType::F64 => { + // TODO handle wrong payload sizer in more distinct way. + let v = f64::from_be_bytes(payload.try_into()?); + info!("Payload as f64: {v}"); + } + _ => { + warn!("TODO handle {ca_st:?}"); + } + } + let d = EventAddRes { + data_type: hi.data_type, + data_count: hi.data_count, + status: hi.param1, + subid: hi.param2, + }; + CaMsg { + ty: CaMsgTy::EventAddRes(d), + } + } + 15 => { + if payload.len() == 8 { + let v = u64::from_be_bytes(payload.try_into()?); + info!("Payload as u64: {v}"); + let v = i64::from_be_bytes(payload.try_into()?); + info!("Payload as i64: {v}"); + let v = f64::from_be_bytes(payload.try_into()?); + info!("Payload as f64: {v}"); + } else { + info!( + "payload string {:?} payload {:?}", + String::from_utf8_lossy(&payload[..payload.len().min(12)]), + &payload[..payload.len().min(12)], + ); + } + CaMsg { + // TODO use different structs for request and response: + ty: CaMsgTy::ReadNotifyRes(ReadNotifyRes { + data_type: hi.data_type, + data_count: hi.data_count, + sid: hi.param1, + ioid: hi.param2, + }), + } + } + x => return Err(Error::with_msg_no_trace(format!("unsupported ca command {}", x))), + }; + Ok(msg) + } +} + +#[derive(Debug)] +pub enum CaItem { + Empty, + Msg(CaMsg), +} + +impl CaItem { + fn empty() -> Self { + CaItem::Empty + } +} + +#[derive(Clone, Debug)] +struct HeadInfo { + cmdid: u16, + payload_size: u16, + data_type: u16, + data_count: u16, + param1: u32, + param2: u32, +} + +enum CaState { + StdHead, + ExtHead(HeadInfo), + Payload(HeadInfo), + Done, +} + +impl CaState { + fn need_min(&self) -> usize { + use CaState::*; + match self { + StdHead => 16, + ExtHead(_) => 8, + Payload(k) => k.payload_size as _, + Done => 123, + } + } +} + +pub struct CaProto { + tcp: TcpStream, + state: CaState, + buf: NetBuf, + outbuf: NetBuf, + out: VecDeque, +} + +impl CaProto { + pub fn new(tcp: TcpStream) -> Self { + Self { + tcp, + state: CaState::StdHead, + buf: NetBuf::new(1024 * 128), + outbuf: NetBuf::new(1024 * 128), + out: VecDeque::new(), + } + } + + pub fn push_out(&mut self, item: CaMsg) { + self.out.push_back(item); + } + + fn inpbuf_conn(&mut self, need_min: usize) -> (&mut TcpStream, ReadBuf) { + (&mut self.tcp, self.buf.read_buf_for_fill(need_min)) + } + + fn outbuf_conn(&mut self) -> (&mut TcpStream, &[u8]) { + (&mut self.tcp, self.outbuf.data()) + } + + fn out_msg_buf(&mut self) -> Option<(&CaMsg, &mut [u8])> { + if let Some(item) = self.out.front() { + info!("attempt to serialize outgoing message msg {:?}", item); + if let Ok(buf) = self.outbuf.write_buf(item.len()) { + Some((item, buf)) + } else { + error!("output buffer too small for message"); + None + } + } else { + None + } + } + + fn attempt_output(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + let (w, b) = self.outbuf_conn(); + pin_mut!(w); + match w.poll_write(cx, b) { + Ready(k) => match k { + Ok(k) => { + info!("sent {} bytes {:?}", k, &self.outbuf.data()[..k]); + match self.outbuf.adv(k) { + Ok(()) => Ready(Ok(())), + Err(e) => { + error!("advance error {:?}", e); + Ready(Err(e)) + } + } + } + Err(e) => { + error!("output write error {:?}", e); + Ready(Err(e.into())) + } + }, + Pending => Pending, + } + } + + fn loop_body(mut self: Pin<&mut Self>, cx: &mut Context) -> Result>, Error> { + use Poll::*; + if self.out.len() != 0 || self.outbuf.len() != 0 { + info!("loop_body out {} outbuf {}", self.out.len(), self.outbuf.len()); + } + let output_res_1: Option> = 'll1: loop { + if self.out.len() == 0 { + break None; + } + while let Some((msg, buf)) = self.out_msg_buf() { + msg.place_into(buf); + self.out.pop_front(); + } + while self.outbuf.len() > 0 { + match Self::attempt_output(self.as_mut(), cx)? { + Ready(()) => {} + Pending => { + break 'll1 Some(Pending); + } + } + } + }; + let output_res_2: Option> = if let Some(Pending) = output_res_1 { + Some(Pending) + } else { + loop { + if self.outbuf.len() == 0 { + break None; + } + match Self::attempt_output(self.as_mut(), cx)? { + Ready(()) => {} + Pending => break Some(Pending), + } + } + }; + let need_min = self.state.need_min(); + let read_res = { + if self.buf.cap() < need_min { + self.state = CaState::Done; + let e = Error::with_msg_no_trace(format!( + "buffer too small for need_min {} {}", + self.buf.cap(), + self.state.need_min() + )); + Err(e) + } else if self.buf.len() < need_min { + let (w, mut rbuf) = self.inpbuf_conn(need_min); + pin_mut!(w); + match w.poll_read(cx, &mut rbuf) { + Ready(k) => match k { + Ok(()) => { + let nf = rbuf.filled().len(); + if nf == 0 { + info!("EOF"); + // TODO may need another state, if not yet done when input is EOF. + self.state = CaState::Done; + Ok(Some(Ready(CaItem::empty()))) + } else { + if false { + info!("received {} bytes", rbuf.filled().len()); + let t = rbuf.filled().len().min(32); + info!("received data {:?}", &rbuf.filled()[0..t]); + } + match self.buf.wadv(nf) { + Ok(()) => Ok(Some(Ready(CaItem::empty()))), + Err(e) => { + error!("netbuf wadv fail nf {nf}"); + Err(e) + } + } + } + } + Err(e) => Err(e.into()), + }, + Pending => Ok(Some(Pending)), + } + } else { + Ok(None) + } + }?; + let parse_res: Option = self.parse_item()?; + match (output_res_2, read_res, parse_res) { + (_, _, Some(item)) => Ok(Some(Ready(item))), + (Some(Pending), _, _) => Ok(Some(Pending)), + (_, Some(Pending), _) => Ok(Some(Pending)), + (_, None, None) => { + // TODO constrain how often we can go to this case consecutively. + Ok(None) + } + (_, Some(_), None) => Ok(None), + } + } + + fn parse_item(&mut self) -> Result, Error> { + loop { + if self.buf.len() < self.state.need_min() { + break Ok(None); + } + break match &self.state { + CaState::StdHead => { + let command = self.buf.read_u16_be()?; + let payload_size = self.buf.read_u16_be()?; + let data_type = self.buf.read_u16_be()?; + let data_count = self.buf.read_u16_be()?; + let param1 = self.buf.read_u32_be()?; + let param2 = self.buf.read_u32_be()?; + let hi = HeadInfo { + cmdid: command, + payload_size, + data_type, + data_count, + param1, + param2, + }; + if hi.cmdid == 6 || hi.cmdid > 26 || hi.data_type > 10 || hi.payload_size > 8 { + warn!("StdHead {hi:?}"); + } + if payload_size == 0xffff && data_count == 0 { + self.state = CaState::ExtHead(hi); + Ok(None) + } else { + if payload_size == 0 { + self.state = CaState::StdHead; + let msg = CaMsg::from_proto_infos(&hi, &[])?; + Ok(Some(CaItem::Msg(msg))) + } else { + self.state = CaState::Payload(hi); + Ok(None) + } + } + } + CaState::ExtHead(hi) => { + let payload_size = self.buf.read_u32_be()?; + let data_count = self.buf.read_u32_be()?; + warn!("ExtHead payload_size {payload_size} data_count {data_count}"); + if payload_size == 0 { + let msg = CaMsg::from_proto_infos(hi, &[])?; + self.state = CaState::StdHead; + Ok(Some(CaItem::Msg(msg))) + } else { + self.state = CaState::Payload(hi.clone()); + Ok(None) + } + } + CaState::Payload(hi) => { + let g = self.buf.read_bytes(hi.payload_size as _)?; + let msg = CaMsg::from_proto_infos(hi, g)?; + self.state = CaState::StdHead; + Ok(Some(CaItem::Msg(msg))) + } + CaState::Done => Err(Error::with_msg_no_trace("attempt to parse in Done state")), + }; + } + } +} + +impl Stream for CaProto { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + if let CaState::Done = self.state { + return Ready(None); + } else { + loop { + break match Self::loop_body(self.as_mut(), cx) { + Ok(Some(Ready(k))) => Ready(Some(Ok(k))), + Ok(Some(Pending)) => Pending, + Ok(None) => continue, + Err(e) => Ready(Some(Err(e))), + }; + } + } + } +}