WIP locate host for channel

This commit is contained in:
Dominik Werder
2022-04-30 15:44:15 +02:00
parent 4f607ce823
commit 0800914e10
3 changed files with 346 additions and 28 deletions

View File

@@ -1,6 +1,8 @@
pub mod conn;
pub mod proto;
use crate::ca::conn::FindIoc;
use self::conn::CaConn;
use err::Error;
use futures_util::StreamExt;
@@ -14,6 +16,8 @@ pub struct CaConnectOpts {
pub async fn ca_connect_3(opts: CaConnectOpts) -> Result<(), Error> {
debug!("ca_connect_3");
let addr = FindIoc::new(opts.channels[0].clone()).await?;
info!("Found IOC address: {addr:?}");
let tcp = TcpStream::connect(&opts.source).await?;
let mut conn = CaConn::new(tcp);
for c in opts.channels {
@@ -22,7 +26,7 @@ pub async fn ca_connect_3(opts: CaConnectOpts) -> Result<(), Error> {
while let Some(item) = conn.next().await {
match item {
Ok(k) => {
info!("CaConn gives item: {k:?}");
trace!("CaConn gives item: {k:?}");
}
Err(e) => {
error!("CaConn gives error: {e:?}");

View File

@@ -1,13 +1,15 @@
use super::proto::{CaItem, CaMsg, CaMsgTy, CaProto};
use crate::ca::proto::{CreateChan, EventAdd, ReadNotify};
use crate::ca::proto::{CreateChan, EventAdd, HeadInfo, ReadNotify};
use err::Error;
use futures_util::{Stream, StreamExt};
use futures_util::{Future, FutureExt, Stream, StreamExt};
use libc::c_int;
use log::*;
use std::collections::BTreeMap;
use std::net::Ipv4Addr;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Instant;
use tokio::net::TcpStream;
use std::time::{Duration, Instant};
use tokio::net::{TcpStream, UdpSocket};
#[derive(Debug)]
enum ChannelError {
@@ -38,7 +40,7 @@ struct CreatedState {
#[derive(Debug)]
enum ChannelState {
NotCreated,
Init,
Creating { cid: u32, ts_beg: Instant },
Created(CreatedState),
Error(ChannelError),
@@ -100,7 +102,7 @@ impl CaConn {
let cid = self.cid_by_name(&channel);
if self.channels.contains_key(&cid) {
} else {
self.channels.insert(cid, ChannelState::NotCreated);
self.channels.insert(cid, ChannelState::Init);
}
}
@@ -126,7 +128,7 @@ impl Stream for CaConn {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
self.poll_count += 1;
if self.poll_count > 30 {
if self.poll_count > 3000 {
error!("TODO CaConn reached poll_count limit");
return Ready(None);
}
@@ -186,8 +188,8 @@ impl Stream for CaConn {
// TODO profile, efficient enough?
let keys: Vec<u32> = self.channels.keys().map(|x| *x).collect();
for cid in keys {
match self.channels[&cid] {
ChannelState::NotCreated => {
match self.channels.get_mut(&cid).unwrap() {
ChannelState::Init => {
let name = self
.name_by_cid(cid)
.ok_or_else(|| Error::with_msg_no_trace("name for cid not known"));
@@ -320,3 +322,304 @@ impl Stream for CaConn {
}
}
}
enum FindIocState {
Init,
WaitWritable(Pin<Box<dyn Future<Output = Result<(), std::io::Error>> + Send>>),
WaitReadable(Pin<Box<dyn Future<Output = Result<(), std::io::Error>> + Send>>),
}
pub struct FindIoc {
state: FindIocState,
channel: String,
sock: Option<UdpSocket>,
addr: libc::sockaddr_in,
addr_len: usize,
deadline: Pin<Box<tokio::time::Sleep>>,
}
// Do low-level approach first to make sure it works as specified.
impl FindIoc {
pub fn new(channel: String) -> Self {
let addr = unsafe { std::mem::transmute_copy(&[0u8; std::mem::size_of::<libc::sockaddr_in>()]) };
Self {
state: FindIocState::Init,
channel,
sock: None,
addr: addr,
addr_len: 0,
deadline: Box::pin(tokio::time::sleep(Duration::from_millis(3000))),
}
}
unsafe fn create_socket(&mut self) -> Result<(), Error> {
// TODO remember to clean up socket on failure.
let ec = libc::socket(libc::AF_INET, libc::SOCK_DGRAM, 0);
if ec == -1 {
return Err("can not create socket".into());
}
let sock = ec;
{
let opt: libc::c_int = 1;
let ec = libc::setsockopt(
sock,
libc::SOL_SOCKET,
libc::SO_BROADCAST,
&opt as *const _ as _,
std::mem::size_of::<libc::c_int>() as _,
);
if ec == -1 {
return Err("can not enable broadcast".into());
}
}
{
let ec = libc::fcntl(sock, libc::F_SETFL, libc::O_NONBLOCK);
if ec == -1 {
return Err("can not set nonblock".into());
}
}
let ip: [u8; 4] = [172, 26, 120, 71];
let addr = libc::sockaddr_in {
sin_family: libc::AF_INET as u16,
sin_port: u16::from_ne_bytes((13882 as u16).to_be_bytes()),
// 172.26.120.71
sin_addr: libc::in_addr {
s_addr: u32::from_ne_bytes(ip),
},
sin_zero: [0; 8],
};
let addr_len = std::mem::size_of::<libc::sockaddr_in>();
self.addr = addr.clone();
self.addr_len = addr_len;
let ec = libc::bind(sock, &addr as *const _ as _, addr_len as _);
if ec == -1 {
return Err("can not bind socket".into());
}
let sock = <std::net::UdpSocket as std::os::unix::prelude::FromRawFd>::from_raw_fd(sock);
let sock = match UdpSocket::from_std(sock) {
Ok(k) => k,
Err(e) => {
error!("can not convert raw socket to tokio socket");
return Err("can not convert raw socket to tokio socket".into());
}
};
self.sock = Some(sock);
info!("Ok created socket");
Ok(())
}
unsafe fn try_write(&mut self) -> Result<Poll<()>, Error> {
use std::os::unix::prelude::AsRawFd;
let sock = self.sock.as_ref().unwrap().as_raw_fd();
let ip: [u8; 4] = [172, 26, 120, 255];
let addr = libc::sockaddr_in {
sin_family: libc::AF_INET as u16,
sin_port: (5064 as u16).to_be(),
sin_addr: libc::in_addr {
s_addr: u32::from_ne_bytes(ip),
},
sin_zero: [0; 8],
};
let addr_len = std::mem::size_of::<libc::sockaddr_in>();
let mut buf = vec![
//
0u8, 0, 0, 0, //
0, 0, 0, 13, //
0, 0, 0, 0, //
0, 0, 0, 0, //
//
//
0, 6, 0, 0, //
0, 0, 0, 13, //
0, 0, 0, 0, //
11, 12, 11, 12,
//
//
];
let chb = self.channel.as_bytes();
let npadded = (chb.len() + 1 + 7) / 8 * 8;
let npad = npadded - self.channel.len();
info!(
"string len {} chb len {} npadded {} npad {}",
self.channel.len(),
chb.len(),
npadded,
npad
);
buf.extend_from_slice(chb);
buf.extend_from_slice(&vec![0u8; npad]);
let npl = (npadded as u16).to_be_bytes();
buf[16 + 2] = npl[0];
buf[16 + 3] = npl[1];
info!("Sending {} bytes", buf.len());
let ec = libc::sendto(
sock,
&buf[0] as *const _ as _,
buf.len() as _,
0,
&addr as *const _ as _,
addr_len as _,
);
if ec == -1 {
let errno = *libc::__errno_location();
if errno == libc::EAGAIN {
info!("NOT YET READY FOR SENDING...");
return Ok(Poll::Pending);
} else {
return Err("can not send".into());
}
}
Ok(Poll::Ready(()))
}
unsafe fn try_read(&mut self) -> Result<Poll<()>, Error> {
info!("Receiving...");
use std::os::unix::prelude::AsRawFd;
let sock = self.sock.as_ref().unwrap().as_raw_fd();
let mut saddr_mem = [0u8; std::mem::size_of::<libc::sockaddr>()];
let mut saddr_len: libc::socklen_t = saddr_mem.len() as _;
let mut buf = vec![0u8; 1024];
let ec = libc::recvfrom(
sock,
buf.as_mut_ptr() as _,
buf.len() as _,
libc::O_NONBLOCK,
&mut saddr_mem as *mut _ as _,
&mut saddr_len as *mut _ as _,
);
if ec == -1 {
let errno = *libc::__errno_location();
if errno == libc::EAGAIN {
info!("try_read BUT NOT YET READY FOR READING...");
return Ok(Poll::Pending);
} else {
return Err("can not read".into());
}
}
info!("received ec {ec}");
if ec > 0 {
let saddr2: libc::sockaddr_in = std::mem::transmute_copy(&saddr_mem);
let src_addr = Ipv4Addr::from(saddr2.sin_addr.s_addr.to_ne_bytes());
let src_port = u16::from_be(saddr2.sin_port);
info!("received from {:?} port {}", src_addr, src_port);
if false {
let mut s1 = String::new();
for i in 0..(ec as usize) {
s1.extend(format!(" {:02x}", buf[i]).chars());
}
info!("received answer {s1}");
info!(
"received answer string {}",
String::from_utf8_lossy(buf[..ec as usize].into())
);
}
// TODO handle that the remote should send its protocol version in the payload.
// TODO handle if we get a too large answer.
// TODO
// Parse the contents of the received datagram...
// Reuse the existing logic for that.
let mut nb = crate::netbuf::NetBuf::new(1024);
nb.put_slice(&buf[..ec as usize])?;
let mut msgs = vec![];
loop {
let n = nb.data().len();
if n == 0 {
break;
}
if n < 16 {
error!("incomplete message, not enough for header");
break;
}
let hi = HeadInfo::from_netbuf(&mut nb)?;
if nb.data().len() < hi.payload() {
error!("incomplete message, missing payload");
break;
}
let msg = CaMsg::from_proto_infos(&hi, nb.data())?;
nb.adv(hi.payload())?;
msgs.push(msg);
}
info!("got {} messages", msgs.len());
for msg in &msgs {
match &msg.ty {
CaMsgTy::SearchRes(k) => {
// TODO make sure that search identifier is correct.
let addr = Ipv4Addr::from(k.addr.to_be_bytes());
info!("ADDRESS: {addr:?}");
info!("PORT: {}", k.tcp_port);
}
_ => {
info!("{msg:?}");
}
}
}
}
Ok(Poll::Ready(()))
}
}
impl Future for FindIoc {
type Output = Result<Ipv4Addr, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
loop {
break match &mut self.state {
FindIocState::Init => match unsafe { Self::create_socket(&mut self) } {
Ok(()) => {
let tmp1 = self.sock.as_mut().unwrap();
let tmp2 = unsafe { &mut *(tmp1 as *mut UdpSocket) };
let fut = tmp2.writable();
self.state = FindIocState::WaitWritable(Box::pin(fut));
continue;
}
Err(e) => {
error!("can not create socket {e:?}");
Ready(Err(e))
}
},
FindIocState::WaitWritable(ref mut fut) => match fut.poll_unpin(cx) {
Ready(Ok(())) => match unsafe { Self::try_write(&mut self) } {
Ok(Ready(())) => {
info!("Writing done...");
let tmp1 = self.sock.as_mut().unwrap();
let tmp2 = unsafe { &mut *(tmp1 as *mut UdpSocket) };
let fut = tmp2.readable();
self.state = FindIocState::WaitReadable(Box::pin(fut));
continue;
}
Ok(Pending) => Pending,
Err(e) => Ready(Err(e)),
},
Ready(Err(e)) => Ready(Err(e.into())),
Pending => Pending,
},
FindIocState::WaitReadable(ref mut fut) => match fut.poll_unpin(cx) {
Ready(Ok(())) => match unsafe { Self::try_read(&mut self) } {
Ok(Ready(())) => {
info!("Reading done...");
let addr = Ipv4Addr::new(127, 0, 0, 10);
Ready(Ok(addr))
}
Ok(Pending) => Pending,
Err(e) => Ready(Err(e)),
},
Ready(Err(e)) => Ready(Err(e.into())),
Pending => match self.deadline.poll_unpin(cx) {
Ready(()) => {
info!("FindIoc deadline reached");
Ready(Ok(Ipv4Addr::new(127, 0, 0, 10)))
}
Pending => Pending,
},
},
};
}
}
}
impl std::fmt::Debug for FindIoc {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
fmt.debug_struct("FindIoc").finish()
}
}

View File

@@ -354,7 +354,7 @@ impl CaMsg {
}
}
fn from_proto_infos(hi: &HeadInfo, payload: &[u8]) -> Result<Self, Error> {
pub fn from_proto_infos(hi: &HeadInfo, payload: &[u8]) -> Result<Self, Error> {
let msg = match hi.cmdid {
0 => CaMsg {
ty: CaMsgTy::VersionRes(hi.data_count),
@@ -470,7 +470,7 @@ impl CaItem {
}
#[derive(Clone, Debug)]
struct HeadInfo {
pub struct HeadInfo {
cmdid: u16,
payload_size: u16,
data_type: u16,
@@ -479,6 +479,30 @@ struct HeadInfo {
param2: u32,
}
impl HeadInfo {
pub fn from_netbuf(buf: &mut NetBuf) -> Result<Self, Error> {
let command = buf.read_u16_be()?;
let payload_size = buf.read_u16_be()?;
let data_type = buf.read_u16_be()?;
let data_count = buf.read_u16_be()?;
let param1 = buf.read_u32_be()?;
let param2 = buf.read_u32_be()?;
let hi = HeadInfo {
cmdid: command,
payload_size,
data_type,
data_count,
param1,
param2,
};
Ok(hi)
}
pub fn payload(&self) -> usize {
self.payload_size as _
}
}
enum CaState {
StdHead,
ExtHead(HeadInfo),
@@ -668,28 +692,15 @@ impl CaProto {
}
break match &self.state {
CaState::StdHead => {
let command = self.buf.read_u16_be()?;
let payload_size = self.buf.read_u16_be()?;
let data_type = self.buf.read_u16_be()?;
let data_count = self.buf.read_u16_be()?;
let param1 = self.buf.read_u32_be()?;
let param2 = self.buf.read_u32_be()?;
let hi = HeadInfo {
cmdid: command,
payload_size,
data_type,
data_count,
param1,
param2,
};
let hi = HeadInfo::from_netbuf(&mut self.buf)?;
if hi.cmdid == 6 || hi.cmdid > 26 || hi.data_type > 10 || hi.payload_size > 8 {
warn!("StdHead {hi:?}");
}
if payload_size == 0xffff && data_count == 0 {
if hi.payload_size == 0xffff && hi.data_count == 0 {
self.state = CaState::ExtHead(hi);
Ok(None)
} else {
if payload_size == 0 {
if hi.payload_size == 0 {
self.state = CaState::StdHead;
let msg = CaMsg::from_proto_infos(&hi, &[])?;
Ok(Some(CaItem::Msg(msg)))