diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 6ad55ee..9cf8f51 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -1,6 +1,8 @@ pub mod conn; pub mod proto; +use crate::ca::conn::FindIoc; + use self::conn::CaConn; use err::Error; use futures_util::StreamExt; @@ -14,6 +16,8 @@ pub struct CaConnectOpts { pub async fn ca_connect_3(opts: CaConnectOpts) -> Result<(), Error> { debug!("ca_connect_3"); + let addr = FindIoc::new(opts.channels[0].clone()).await?; + info!("Found IOC address: {addr:?}"); let tcp = TcpStream::connect(&opts.source).await?; let mut conn = CaConn::new(tcp); for c in opts.channels { @@ -22,7 +26,7 @@ pub async fn ca_connect_3(opts: CaConnectOpts) -> Result<(), Error> { while let Some(item) = conn.next().await { match item { Ok(k) => { - info!("CaConn gives item: {k:?}"); + trace!("CaConn gives item: {k:?}"); } Err(e) => { error!("CaConn gives error: {e:?}"); diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index fcf8546..378d3f4 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1,13 +1,15 @@ use super::proto::{CaItem, CaMsg, CaMsgTy, CaProto}; -use crate::ca::proto::{CreateChan, EventAdd, ReadNotify}; +use crate::ca::proto::{CreateChan, EventAdd, HeadInfo, ReadNotify}; use err::Error; -use futures_util::{Stream, StreamExt}; +use futures_util::{Future, FutureExt, Stream, StreamExt}; +use libc::c_int; use log::*; use std::collections::BTreeMap; +use std::net::Ipv4Addr; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::Instant; -use tokio::net::TcpStream; +use std::time::{Duration, Instant}; +use tokio::net::{TcpStream, UdpSocket}; #[derive(Debug)] enum ChannelError { @@ -38,7 +40,7 @@ struct CreatedState { #[derive(Debug)] enum ChannelState { - NotCreated, + Init, Creating { cid: u32, ts_beg: Instant }, Created(CreatedState), Error(ChannelError), @@ -100,7 +102,7 @@ impl CaConn { let cid = self.cid_by_name(&channel); if self.channels.contains_key(&cid) { } else { - self.channels.insert(cid, ChannelState::NotCreated); + self.channels.insert(cid, ChannelState::Init); } } @@ -126,7 +128,7 @@ impl Stream for CaConn { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; self.poll_count += 1; - if self.poll_count > 30 { + if self.poll_count > 3000 { error!("TODO CaConn reached poll_count limit"); return Ready(None); } @@ -186,8 +188,8 @@ impl Stream for CaConn { // TODO profile, efficient enough? let keys: Vec = self.channels.keys().map(|x| *x).collect(); for cid in keys { - match self.channels[&cid] { - ChannelState::NotCreated => { + match self.channels.get_mut(&cid).unwrap() { + ChannelState::Init => { let name = self .name_by_cid(cid) .ok_or_else(|| Error::with_msg_no_trace("name for cid not known")); @@ -320,3 +322,304 @@ impl Stream for CaConn { } } } + +enum FindIocState { + Init, + WaitWritable(Pin> + Send>>), + WaitReadable(Pin> + Send>>), +} + +pub struct FindIoc { + state: FindIocState, + channel: String, + sock: Option, + addr: libc::sockaddr_in, + addr_len: usize, + deadline: Pin>, +} + +// Do low-level approach first to make sure it works as specified. +impl FindIoc { + pub fn new(channel: String) -> Self { + let addr = unsafe { std::mem::transmute_copy(&[0u8; std::mem::size_of::()]) }; + Self { + state: FindIocState::Init, + channel, + sock: None, + addr: addr, + addr_len: 0, + deadline: Box::pin(tokio::time::sleep(Duration::from_millis(3000))), + } + } + + unsafe fn create_socket(&mut self) -> Result<(), Error> { + // TODO remember to clean up socket on failure. + let ec = libc::socket(libc::AF_INET, libc::SOCK_DGRAM, 0); + if ec == -1 { + return Err("can not create socket".into()); + } + let sock = ec; + { + let opt: libc::c_int = 1; + let ec = libc::setsockopt( + sock, + libc::SOL_SOCKET, + libc::SO_BROADCAST, + &opt as *const _ as _, + std::mem::size_of::() as _, + ); + if ec == -1 { + return Err("can not enable broadcast".into()); + } + } + { + let ec = libc::fcntl(sock, libc::F_SETFL, libc::O_NONBLOCK); + if ec == -1 { + return Err("can not set nonblock".into()); + } + } + let ip: [u8; 4] = [172, 26, 120, 71]; + let addr = libc::sockaddr_in { + sin_family: libc::AF_INET as u16, + sin_port: u16::from_ne_bytes((13882 as u16).to_be_bytes()), + // 172.26.120.71 + sin_addr: libc::in_addr { + s_addr: u32::from_ne_bytes(ip), + }, + sin_zero: [0; 8], + }; + let addr_len = std::mem::size_of::(); + self.addr = addr.clone(); + self.addr_len = addr_len; + let ec = libc::bind(sock, &addr as *const _ as _, addr_len as _); + if ec == -1 { + return Err("can not bind socket".into()); + } + let sock = ::from_raw_fd(sock); + let sock = match UdpSocket::from_std(sock) { + Ok(k) => k, + Err(e) => { + error!("can not convert raw socket to tokio socket"); + return Err("can not convert raw socket to tokio socket".into()); + } + }; + self.sock = Some(sock); + info!("Ok created socket"); + Ok(()) + } + + unsafe fn try_write(&mut self) -> Result, Error> { + use std::os::unix::prelude::AsRawFd; + let sock = self.sock.as_ref().unwrap().as_raw_fd(); + let ip: [u8; 4] = [172, 26, 120, 255]; + let addr = libc::sockaddr_in { + sin_family: libc::AF_INET as u16, + sin_port: (5064 as u16).to_be(), + sin_addr: libc::in_addr { + s_addr: u32::from_ne_bytes(ip), + }, + sin_zero: [0; 8], + }; + let addr_len = std::mem::size_of::(); + let mut buf = vec![ + // + 0u8, 0, 0, 0, // + 0, 0, 0, 13, // + 0, 0, 0, 0, // + 0, 0, 0, 0, // + // + // + 0, 6, 0, 0, // + 0, 0, 0, 13, // + 0, 0, 0, 0, // + 11, 12, 11, 12, + // + // + ]; + let chb = self.channel.as_bytes(); + let npadded = (chb.len() + 1 + 7) / 8 * 8; + let npad = npadded - self.channel.len(); + info!( + "string len {} chb len {} npadded {} npad {}", + self.channel.len(), + chb.len(), + npadded, + npad + ); + buf.extend_from_slice(chb); + buf.extend_from_slice(&vec![0u8; npad]); + let npl = (npadded as u16).to_be_bytes(); + buf[16 + 2] = npl[0]; + buf[16 + 3] = npl[1]; + info!("Sending {} bytes", buf.len()); + let ec = libc::sendto( + sock, + &buf[0] as *const _ as _, + buf.len() as _, + 0, + &addr as *const _ as _, + addr_len as _, + ); + if ec == -1 { + let errno = *libc::__errno_location(); + if errno == libc::EAGAIN { + info!("NOT YET READY FOR SENDING..."); + return Ok(Poll::Pending); + } else { + return Err("can not send".into()); + } + } + Ok(Poll::Ready(())) + } + + unsafe fn try_read(&mut self) -> Result, Error> { + info!("Receiving..."); + use std::os::unix::prelude::AsRawFd; + let sock = self.sock.as_ref().unwrap().as_raw_fd(); + let mut saddr_mem = [0u8; std::mem::size_of::()]; + let mut saddr_len: libc::socklen_t = saddr_mem.len() as _; + let mut buf = vec![0u8; 1024]; + let ec = libc::recvfrom( + sock, + buf.as_mut_ptr() as _, + buf.len() as _, + libc::O_NONBLOCK, + &mut saddr_mem as *mut _ as _, + &mut saddr_len as *mut _ as _, + ); + if ec == -1 { + let errno = *libc::__errno_location(); + if errno == libc::EAGAIN { + info!("try_read BUT NOT YET READY FOR READING..."); + return Ok(Poll::Pending); + } else { + return Err("can not read".into()); + } + } + info!("received ec {ec}"); + if ec > 0 { + let saddr2: libc::sockaddr_in = std::mem::transmute_copy(&saddr_mem); + let src_addr = Ipv4Addr::from(saddr2.sin_addr.s_addr.to_ne_bytes()); + let src_port = u16::from_be(saddr2.sin_port); + info!("received from {:?} port {}", src_addr, src_port); + if false { + let mut s1 = String::new(); + for i in 0..(ec as usize) { + s1.extend(format!(" {:02x}", buf[i]).chars()); + } + info!("received answer {s1}"); + info!( + "received answer string {}", + String::from_utf8_lossy(buf[..ec as usize].into()) + ); + } + // TODO handle that the remote should send its protocol version in the payload. + // TODO handle if we get a too large answer. + // TODO + // Parse the contents of the received datagram... + // Reuse the existing logic for that. + let mut nb = crate::netbuf::NetBuf::new(1024); + nb.put_slice(&buf[..ec as usize])?; + let mut msgs = vec![]; + loop { + let n = nb.data().len(); + if n == 0 { + break; + } + if n < 16 { + error!("incomplete message, not enough for header"); + break; + } + let hi = HeadInfo::from_netbuf(&mut nb)?; + if nb.data().len() < hi.payload() { + error!("incomplete message, missing payload"); + break; + } + let msg = CaMsg::from_proto_infos(&hi, nb.data())?; + nb.adv(hi.payload())?; + msgs.push(msg); + } + info!("got {} messages", msgs.len()); + for msg in &msgs { + match &msg.ty { + CaMsgTy::SearchRes(k) => { + // TODO make sure that search identifier is correct. + let addr = Ipv4Addr::from(k.addr.to_be_bytes()); + info!("ADDRESS: {addr:?}"); + info!("PORT: {}", k.tcp_port); + } + _ => { + info!("{msg:?}"); + } + } + } + } + Ok(Poll::Ready(())) + } +} + +impl Future for FindIoc { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + use Poll::*; + loop { + break match &mut self.state { + FindIocState::Init => match unsafe { Self::create_socket(&mut self) } { + Ok(()) => { + let tmp1 = self.sock.as_mut().unwrap(); + let tmp2 = unsafe { &mut *(tmp1 as *mut UdpSocket) }; + let fut = tmp2.writable(); + self.state = FindIocState::WaitWritable(Box::pin(fut)); + continue; + } + Err(e) => { + error!("can not create socket {e:?}"); + Ready(Err(e)) + } + }, + FindIocState::WaitWritable(ref mut fut) => match fut.poll_unpin(cx) { + Ready(Ok(())) => match unsafe { Self::try_write(&mut self) } { + Ok(Ready(())) => { + info!("Writing done..."); + let tmp1 = self.sock.as_mut().unwrap(); + let tmp2 = unsafe { &mut *(tmp1 as *mut UdpSocket) }; + let fut = tmp2.readable(); + self.state = FindIocState::WaitReadable(Box::pin(fut)); + continue; + } + Ok(Pending) => Pending, + Err(e) => Ready(Err(e)), + }, + Ready(Err(e)) => Ready(Err(e.into())), + Pending => Pending, + }, + FindIocState::WaitReadable(ref mut fut) => match fut.poll_unpin(cx) { + Ready(Ok(())) => match unsafe { Self::try_read(&mut self) } { + Ok(Ready(())) => { + info!("Reading done..."); + let addr = Ipv4Addr::new(127, 0, 0, 10); + Ready(Ok(addr)) + } + Ok(Pending) => Pending, + Err(e) => Ready(Err(e)), + }, + Ready(Err(e)) => Ready(Err(e.into())), + Pending => match self.deadline.poll_unpin(cx) { + Ready(()) => { + info!("FindIoc deadline reached"); + Ready(Ok(Ipv4Addr::new(127, 0, 0, 10))) + } + Pending => Pending, + }, + }, + }; + } + } +} + +impl std::fmt::Debug for FindIoc { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + fmt.debug_struct("FindIoc").finish() + } +} diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 2cdc437..be6a159 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -354,7 +354,7 @@ impl CaMsg { } } - fn from_proto_infos(hi: &HeadInfo, payload: &[u8]) -> Result { + pub fn from_proto_infos(hi: &HeadInfo, payload: &[u8]) -> Result { let msg = match hi.cmdid { 0 => CaMsg { ty: CaMsgTy::VersionRes(hi.data_count), @@ -470,7 +470,7 @@ impl CaItem { } #[derive(Clone, Debug)] -struct HeadInfo { +pub struct HeadInfo { cmdid: u16, payload_size: u16, data_type: u16, @@ -479,6 +479,30 @@ struct HeadInfo { param2: u32, } +impl HeadInfo { + pub fn from_netbuf(buf: &mut NetBuf) -> Result { + let command = buf.read_u16_be()?; + let payload_size = buf.read_u16_be()?; + let data_type = buf.read_u16_be()?; + let data_count = buf.read_u16_be()?; + let param1 = buf.read_u32_be()?; + let param2 = buf.read_u32_be()?; + let hi = HeadInfo { + cmdid: command, + payload_size, + data_type, + data_count, + param1, + param2, + }; + Ok(hi) + } + + pub fn payload(&self) -> usize { + self.payload_size as _ + } +} + enum CaState { StdHead, ExtHead(HeadInfo), @@ -668,28 +692,15 @@ impl CaProto { } break match &self.state { CaState::StdHead => { - let command = self.buf.read_u16_be()?; - let payload_size = self.buf.read_u16_be()?; - let data_type = self.buf.read_u16_be()?; - let data_count = self.buf.read_u16_be()?; - let param1 = self.buf.read_u32_be()?; - let param2 = self.buf.read_u32_be()?; - let hi = HeadInfo { - cmdid: command, - payload_size, - data_type, - data_count, - param1, - param2, - }; + let hi = HeadInfo::from_netbuf(&mut self.buf)?; if hi.cmdid == 6 || hi.cmdid > 26 || hi.data_type > 10 || hi.payload_size > 8 { warn!("StdHead {hi:?}"); } - if payload_size == 0xffff && data_count == 0 { + if hi.payload_size == 0xffff && hi.data_count == 0 { self.state = CaState::ExtHead(hi); Ok(None) } else { - if payload_size == 0 { + if hi.payload_size == 0 { self.state = CaState::StdHead; let msg = CaMsg::from_proto_infos(&hi, &[])?; Ok(Some(CaItem::Msg(msg)))