WIP
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
use clap::Parser;
|
||||
use daqingest::{DaqIngestOpts, SubCmd};
|
||||
use daqingest::{ChannelAccess, DaqIngestOpts, SubCmd};
|
||||
use err::Error;
|
||||
|
||||
pub fn main() -> Result<(), Error> {
|
||||
@@ -19,7 +19,10 @@ pub fn main() -> Result<(), Error> {
|
||||
let mut f = netfetch::zmtp::BsreadDumper::new(k.source);
|
||||
f.run().await?
|
||||
}
|
||||
SubCmd::ChannelAccess(k) => netfetch::ca::ca_connect_3(k.into()).await?,
|
||||
SubCmd::ChannelAccess(k) => match k {
|
||||
ChannelAccess::CaChannel(k) => netfetch::ca::ca_connect(k.into()).await?,
|
||||
ChannelAccess::CaConfig(k) => netfetch::ca::ca_listen_from_file(k.config).await?,
|
||||
},
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
|
||||
@@ -23,6 +23,7 @@ pub enum SubCmd {
|
||||
ListPulses,
|
||||
FetchEvents(FetchEvents),
|
||||
BsreadDump(BsreadDump),
|
||||
#[clap(subcommand)]
|
||||
ChannelAccess(ChannelAccess),
|
||||
}
|
||||
|
||||
@@ -72,18 +73,25 @@ pub struct BsreadDump {
|
||||
}
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
pub struct ChannelAccess {
|
||||
#[clap(long)]
|
||||
pub source: String,
|
||||
pub enum ChannelAccess {
|
||||
CaChannel(CaChannel),
|
||||
CaConfig(CaConfig),
|
||||
}
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
pub struct CaChannel {
|
||||
#[clap(long)]
|
||||
pub channel: Vec<String>,
|
||||
}
|
||||
|
||||
impl From<ChannelAccess> for CaConnectOpts {
|
||||
fn from(k: ChannelAccess) -> Self {
|
||||
Self {
|
||||
source: k.source,
|
||||
channels: k.channel,
|
||||
}
|
||||
impl From<CaChannel> for CaConnectOpts {
|
||||
fn from(k: CaChannel) -> Self {
|
||||
Self { channels: k.channel }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
pub struct CaConfig {
|
||||
#[clap(long)]
|
||||
pub config: String,
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ scylla = "0.4"
|
||||
md-5 = "0.9"
|
||||
hex = "0.4"
|
||||
libc = "0.2"
|
||||
regex = "1.5.5"
|
||||
log = { path = "../log" }
|
||||
stats = { path = "../stats" }
|
||||
err = { path = "../../daqbuffer/err" }
|
||||
|
||||
@@ -1,36 +1,135 @@
|
||||
pub mod conn;
|
||||
pub mod proto;
|
||||
|
||||
use crate::ca::conn::FindIoc;
|
||||
|
||||
use self::conn::CaConn;
|
||||
use conn::{CaConn, FindIoc};
|
||||
use err::Error;
|
||||
use futures_util::StreamExt;
|
||||
use futures_util::stream::{FuturesOrdered, FuturesUnordered};
|
||||
use futures_util::{StreamExt, TryFutureExt};
|
||||
use log::*;
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
use std::path::PathBuf;
|
||||
use tokio::fs::OpenOptions;
|
||||
use tokio::io::{AsyncBufReadExt, BufReader};
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
pub async fn ca_listen_from_file(conf: impl Into<PathBuf>) -> Result<(), Error> {
|
||||
let file = OpenOptions::new().read(true).open(conf.into()).await?;
|
||||
let mut lines = BufReader::new(file).lines();
|
||||
let re = regex::Regex::new(r"^([-:._A-Za-z0-9]+)")?;
|
||||
let mut channels = vec![];
|
||||
while let Some(line) = lines.next_line().await? {
|
||||
if let Some(cs) = re.captures(&line) {
|
||||
let m = cs.get(1).unwrap();
|
||||
let channel = m.as_str();
|
||||
channels.push(channel.to_string());
|
||||
}
|
||||
}
|
||||
let opts = CaConnectOpts { channels };
|
||||
ca_connect(opts).await
|
||||
}
|
||||
|
||||
pub struct CaConnectOpts {
|
||||
pub source: String,
|
||||
pub channels: Vec<String>,
|
||||
}
|
||||
|
||||
pub async fn ca_connect_3(opts: CaConnectOpts) -> Result<(), Error> {
|
||||
debug!("ca_connect_3");
|
||||
let addr = FindIoc::new(opts.channels[0].clone()).await?;
|
||||
info!("Found IOC address: {addr:?}");
|
||||
let tcp = TcpStream::connect(&opts.source).await?;
|
||||
let mut conn = CaConn::new(tcp);
|
||||
for c in opts.channels {
|
||||
conn.channel_add(c);
|
||||
pub async fn ca_connect(opts: CaConnectOpts) -> Result<(), Error> {
|
||||
info!("Look up {} channel hosts", opts.channels.len());
|
||||
let mut fut_queue = FuturesUnordered::new();
|
||||
let mut res2 = vec![];
|
||||
let mut chns = VecDeque::from(opts.channels);
|
||||
'lo2: loop {
|
||||
const MAX_SIMUL: usize = 23;
|
||||
while fut_queue.len() < MAX_SIMUL && chns.len() > 0 {
|
||||
let ch = chns.pop_front().unwrap();
|
||||
let ch2 = ch.clone();
|
||||
info!("Start search for {}", ch);
|
||||
let fut = FindIoc::new(ch.clone()).map_ok(move |x| (ch2, x));
|
||||
let jh = tokio::spawn(fut);
|
||||
fut_queue.push(jh);
|
||||
if chns.is_empty() {
|
||||
break 'lo2;
|
||||
}
|
||||
}
|
||||
while fut_queue.len() >= MAX_SIMUL {
|
||||
match fut_queue.next().await {
|
||||
Some(item) => {
|
||||
res2.push(item);
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
while let Some(item) = conn.next().await {
|
||||
while fut_queue.len() > 0 {
|
||||
match fut_queue.next().await {
|
||||
Some(item) => {
|
||||
res2.push(item);
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
info!("Collected {} results", res2.len());
|
||||
let mut channels_by_host = BTreeMap::new();
|
||||
for item in res2 {
|
||||
// TODO should we continue even if some channel gives an error?
|
||||
let item = item
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))
|
||||
.unwrap_or_else(|e| Err(e));
|
||||
match item {
|
||||
Ok(k) => {
|
||||
trace!("CaConn gives item: {k:?}");
|
||||
Ok(item) => {
|
||||
info!("Found address {} {:?}", item.0, item.1);
|
||||
let key = item.1;
|
||||
if !channels_by_host.contains_key(&key) {
|
||||
channels_by_host.insert(key, vec![item.0]);
|
||||
} else {
|
||||
channels_by_host.get_mut(&key).unwrap().push(item.0);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("CaConn gives error: {e:?}");
|
||||
break;
|
||||
error!("Got error: {e:?}");
|
||||
}
|
||||
};
|
||||
}
|
||||
for (host, channels) in &channels_by_host {
|
||||
info!("Have: {:?} {:?}", host, channels);
|
||||
}
|
||||
if false {
|
||||
return Ok(());
|
||||
}
|
||||
let mut conn_jhs = vec![];
|
||||
for (host, channels) in channels_by_host {
|
||||
let conn_block = async move {
|
||||
info!("Create TCP connection to {:?}", (host.addr, host.port));
|
||||
let tcp = TcpStream::connect((host.addr, host.port)).await?;
|
||||
let mut conn = CaConn::new(tcp);
|
||||
for c in channels {
|
||||
conn.channel_add(c);
|
||||
}
|
||||
while let Some(item) = conn.next().await {
|
||||
match item {
|
||||
Ok(k) => {
|
||||
trace!("CaConn gives item: {k:?}");
|
||||
}
|
||||
Err(e) => {
|
||||
error!("CaConn gives error: {e:?}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok::<_, Error>(())
|
||||
};
|
||||
let jh = tokio::spawn(conn_block);
|
||||
conn_jhs.push(jh);
|
||||
}
|
||||
for jh in conn_jhs {
|
||||
match jh.await {
|
||||
Ok(k) => match k {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("{e:?}");
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
error!("{e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,7 +9,8 @@ use std::net::Ipv4Addr;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::net::{TcpStream, UdpSocket};
|
||||
use tokio::io::unix::AsyncFd;
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum ChannelError {
|
||||
@@ -325,17 +326,39 @@ impl Stream for CaConn {
|
||||
|
||||
enum FindIocState {
|
||||
Init,
|
||||
WaitWritable(Pin<Box<dyn Future<Output = Result<(), std::io::Error>> + Send>>),
|
||||
WaitReadable(Pin<Box<dyn Future<Output = Result<(), std::io::Error>> + Send>>),
|
||||
WaitWritable,
|
||||
WaitReadable,
|
||||
}
|
||||
|
||||
struct SockBox(c_int);
|
||||
|
||||
impl Drop for SockBox {
|
||||
fn drop(self: &mut Self) {
|
||||
if self.0 != -1 {
|
||||
unsafe {
|
||||
libc::close(self.0);
|
||||
self.0 = -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub struct Tcp4Addr {
|
||||
pub addr: Ipv4Addr,
|
||||
pub port: u16,
|
||||
}
|
||||
|
||||
pub struct FindIoc {
|
||||
state: FindIocState,
|
||||
channel: String,
|
||||
sock: Option<UdpSocket>,
|
||||
search_id: u32,
|
||||
sock: SockBox,
|
||||
afd: Option<AsyncFd<i32>>,
|
||||
addr: libc::sockaddr_in,
|
||||
addr_len: usize,
|
||||
deadline: Pin<Box<tokio::time::Sleep>>,
|
||||
result: Option<Tcp4Addr>,
|
||||
}
|
||||
|
||||
// Do low-level approach first to make sure it works as specified.
|
||||
@@ -345,24 +368,26 @@ impl FindIoc {
|
||||
Self {
|
||||
state: FindIocState::Init,
|
||||
channel,
|
||||
sock: None,
|
||||
search_id: 0x12345678,
|
||||
sock: SockBox(-1),
|
||||
afd: None,
|
||||
addr: addr,
|
||||
addr_len: 0,
|
||||
deadline: Box::pin(tokio::time::sleep(Duration::from_millis(3000))),
|
||||
deadline: Box::pin(tokio::time::sleep(Duration::from_millis(200))),
|
||||
result: None,
|
||||
}
|
||||
}
|
||||
|
||||
unsafe fn create_socket(&mut self) -> Result<(), Error> {
|
||||
// TODO remember to clean up socket on failure.
|
||||
let ec = libc::socket(libc::AF_INET, libc::SOCK_DGRAM, 0);
|
||||
if ec == -1 {
|
||||
return Err("can not create socket".into());
|
||||
}
|
||||
let sock = ec;
|
||||
let sock = SockBox(ec);
|
||||
{
|
||||
let opt: libc::c_int = 1;
|
||||
let ec = libc::setsockopt(
|
||||
sock,
|
||||
sock.0,
|
||||
libc::SOL_SOCKET,
|
||||
libc::SO_BROADCAST,
|
||||
&opt as *const _ as _,
|
||||
@@ -373,16 +398,16 @@ impl FindIoc {
|
||||
}
|
||||
}
|
||||
{
|
||||
let ec = libc::fcntl(sock, libc::F_SETFL, libc::O_NONBLOCK);
|
||||
let ec = libc::fcntl(sock.0, libc::F_SETFL, libc::O_NONBLOCK);
|
||||
if ec == -1 {
|
||||
return Err("can not set nonblock".into());
|
||||
}
|
||||
}
|
||||
let ip: [u8; 4] = [172, 26, 120, 71];
|
||||
//let ip: [u8; 4] = [172, 26, 120, 71];
|
||||
let ip: [u8; 4] = [0, 0, 0, 0];
|
||||
let addr = libc::sockaddr_in {
|
||||
sin_family: libc::AF_INET as u16,
|
||||
sin_port: u16::from_ne_bytes((13882 as u16).to_be_bytes()),
|
||||
// 172.26.120.71
|
||||
sin_port: 0,
|
||||
sin_addr: libc::in_addr {
|
||||
s_addr: u32::from_ne_bytes(ip),
|
||||
},
|
||||
@@ -391,26 +416,36 @@ impl FindIoc {
|
||||
let addr_len = std::mem::size_of::<libc::sockaddr_in>();
|
||||
self.addr = addr.clone();
|
||||
self.addr_len = addr_len;
|
||||
let ec = libc::bind(sock, &addr as *const _ as _, addr_len as _);
|
||||
let ec = libc::bind(sock.0, &addr as *const _ as _, addr_len as _);
|
||||
if ec == -1 {
|
||||
return Err("can not bind socket".into());
|
||||
}
|
||||
let sock = <std::net::UdpSocket as std::os::unix::prelude::FromRawFd>::from_raw_fd(sock);
|
||||
let sock = match UdpSocket::from_std(sock) {
|
||||
Ok(k) => k,
|
||||
Err(e) => {
|
||||
error!("can not convert raw socket to tokio socket");
|
||||
{
|
||||
let mut addr = libc::sockaddr_in {
|
||||
sin_family: libc::AF_INET as u16,
|
||||
sin_port: 0,
|
||||
sin_addr: libc::in_addr { s_addr: 0 },
|
||||
sin_zero: [0; 8],
|
||||
};
|
||||
let mut addr_len = std::mem::size_of::<libc::sockaddr_in>();
|
||||
let ec = libc::getsockname(sock.0, &mut addr as *mut _ as _, &mut addr_len as *mut _ as _);
|
||||
if ec == -1 {
|
||||
error!("getsockname {ec}");
|
||||
return Err("can not convert raw socket to tokio socket".into());
|
||||
} else {
|
||||
if false {
|
||||
let ipv4 = Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes());
|
||||
let tcp_port = u16::from_be(addr.sin_port);
|
||||
info!("bound local socket to {:?} port {}", ipv4, tcp_port);
|
||||
}
|
||||
}
|
||||
};
|
||||
self.sock = Some(sock);
|
||||
info!("Ok created socket");
|
||||
}
|
||||
self.sock = sock;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
unsafe fn try_write(&mut self) -> Result<Poll<()>, Error> {
|
||||
use std::os::unix::prelude::AsRawFd;
|
||||
let sock = self.sock.as_ref().unwrap().as_raw_fd();
|
||||
unsafe fn try_write(&mut self) -> Result<(), Error> {
|
||||
let sock = self.sock.0;
|
||||
let ip: [u8; 4] = [172, 26, 120, 255];
|
||||
let addr = libc::sockaddr_in {
|
||||
sin_family: libc::AF_INET as u16,
|
||||
@@ -432,26 +467,22 @@ impl FindIoc {
|
||||
0, 6, 0, 0, //
|
||||
0, 0, 0, 13, //
|
||||
0, 0, 0, 0, //
|
||||
11, 12, 11, 12,
|
||||
0, 0, 0, 0,
|
||||
//
|
||||
//
|
||||
];
|
||||
let chb = self.channel.as_bytes();
|
||||
let npadded = (chb.len() + 1 + 7) / 8 * 8;
|
||||
let npad = npadded - self.channel.len();
|
||||
info!(
|
||||
"string len {} chb len {} npadded {} npad {}",
|
||||
self.channel.len(),
|
||||
chb.len(),
|
||||
npadded,
|
||||
npad
|
||||
);
|
||||
buf.extend_from_slice(chb);
|
||||
buf.extend_from_slice(&vec![0u8; npad]);
|
||||
let npl = (npadded as u16).to_be_bytes();
|
||||
buf[16 + 2] = npl[0];
|
||||
buf[16 + 3] = npl[1];
|
||||
info!("Sending {} bytes", buf.len());
|
||||
let a = self.search_id.to_be_bytes();
|
||||
for (x, y) in buf[16 + 12..16 + 16].iter_mut().zip(a.into_iter()) {
|
||||
*x = y;
|
||||
}
|
||||
let ec = libc::sendto(
|
||||
sock,
|
||||
&buf[0] as *const _ as _,
|
||||
@@ -463,19 +494,17 @@ impl FindIoc {
|
||||
if ec == -1 {
|
||||
let errno = *libc::__errno_location();
|
||||
if errno == libc::EAGAIN {
|
||||
info!("NOT YET READY FOR SENDING...");
|
||||
return Ok(Poll::Pending);
|
||||
error!("NOT YET READY FOR SENDING...");
|
||||
return Err("socket not ready for write".into());
|
||||
} else {
|
||||
return Err("can not send".into());
|
||||
}
|
||||
}
|
||||
Ok(Poll::Ready(()))
|
||||
Ok(())
|
||||
}
|
||||
|
||||
unsafe fn try_read(&mut self) -> Result<Poll<()>, Error> {
|
||||
info!("Receiving...");
|
||||
use std::os::unix::prelude::AsRawFd;
|
||||
let sock = self.sock.as_ref().unwrap().as_raw_fd();
|
||||
unsafe fn try_read(&mut self) -> Result<(), Error> {
|
||||
let sock = self.sock.0;
|
||||
let mut saddr_mem = [0u8; std::mem::size_of::<libc::sockaddr>()];
|
||||
let mut saddr_len: libc::socklen_t = saddr_mem.len() as _;
|
||||
let mut buf = vec![0u8; 1024];
|
||||
@@ -490,14 +519,16 @@ impl FindIoc {
|
||||
if ec == -1 {
|
||||
let errno = *libc::__errno_location();
|
||||
if errno == libc::EAGAIN {
|
||||
info!("try_read BUT NOT YET READY FOR READING...");
|
||||
return Ok(Poll::Pending);
|
||||
error!("try_read BUT NOT YET READY FOR READING...");
|
||||
return Err("socket not ready for read".into());
|
||||
} else {
|
||||
return Err("can not read".into());
|
||||
}
|
||||
}
|
||||
info!("received ec {ec}");
|
||||
if ec > 0 {
|
||||
} else if ec < 0 {
|
||||
error!("unexpected received {ec}");
|
||||
} else if ec == 0 {
|
||||
error!("received zero bytes");
|
||||
} else {
|
||||
let saddr2: libc::sockaddr_in = std::mem::transmute_copy(&saddr_mem);
|
||||
let src_addr = Ipv4Addr::from(saddr2.sin_addr.s_addr.to_ne_bytes());
|
||||
let src_port = u16::from_be(saddr2.sin_port);
|
||||
@@ -507,18 +538,14 @@ impl FindIoc {
|
||||
for i in 0..(ec as usize) {
|
||||
s1.extend(format!(" {:02x}", buf[i]).chars());
|
||||
}
|
||||
info!("received answer {s1}");
|
||||
info!(
|
||||
debug!("received answer {s1}");
|
||||
debug!(
|
||||
"received answer string {}",
|
||||
String::from_utf8_lossy(buf[..ec as usize].into())
|
||||
);
|
||||
}
|
||||
// TODO handle that the remote should send its protocol version in the payload.
|
||||
// TODO handle if we get a too large answer.
|
||||
// TODO
|
||||
// Parse the contents of the received datagram...
|
||||
// Reuse the existing logic for that.
|
||||
let mut nb = crate::netbuf::NetBuf::new(1024);
|
||||
let mut nb = crate::netbuf::NetBuf::new(2048);
|
||||
nb.put_slice(&buf[..ec as usize])?;
|
||||
let mut msgs = vec![];
|
||||
loop {
|
||||
@@ -539,14 +566,33 @@ impl FindIoc {
|
||||
nb.adv(hi.payload())?;
|
||||
msgs.push(msg);
|
||||
}
|
||||
info!("got {} messages", msgs.len());
|
||||
for msg in &msgs {
|
||||
info!("received {} msgs", msgs.len());
|
||||
for (msg_ix, msg) in msgs.iter().enumerate() {
|
||||
match &msg.ty {
|
||||
CaMsgTy::SearchRes(k) => {
|
||||
// TODO make sure that search identifier is correct.
|
||||
let addr = Ipv4Addr::from(k.addr.to_be_bytes());
|
||||
info!("ADDRESS: {addr:?}");
|
||||
info!("PORT: {}", k.tcp_port);
|
||||
if k.id != self.search_id {
|
||||
warn!("id mismatch {} vs {}", k.id, self.search_id);
|
||||
}
|
||||
if false {
|
||||
let addr = Ipv4Addr::from(k.addr.to_be_bytes());
|
||||
info!("Converted address: {addr:?}");
|
||||
}
|
||||
info!(
|
||||
"Received: {}/{} {:?} {:?} {}",
|
||||
msg_ix,
|
||||
msgs.len(),
|
||||
self.channel,
|
||||
src_addr,
|
||||
k.tcp_port
|
||||
);
|
||||
if self.result.is_none() {
|
||||
self.result = Some(Tcp4Addr {
|
||||
addr: src_addr,
|
||||
port: k.tcp_port,
|
||||
});
|
||||
} else {
|
||||
warn!("Result already populated for {}", self.channel);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
info!("{msg:?}");
|
||||
@@ -554,23 +600,32 @@ impl FindIoc {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Poll::Ready(()))
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for FindIoc {
|
||||
type Output = Result<Ipv4Addr, Error>;
|
||||
type Output = Result<Tcp4Addr, Error>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
use Poll::*;
|
||||
loop {
|
||||
match self.deadline.poll_unpin(cx) {
|
||||
Ready(()) => {
|
||||
break Ready(
|
||||
self.result
|
||||
.clone()
|
||||
.ok_or_else(|| Error::with_msg_no_trace(format!("can not find host for {}", self.channel))),
|
||||
);
|
||||
}
|
||||
Pending => {}
|
||||
}
|
||||
break match &mut self.state {
|
||||
FindIocState::Init => match unsafe { Self::create_socket(&mut self) } {
|
||||
Ok(()) => {
|
||||
let tmp1 = self.sock.as_mut().unwrap();
|
||||
let tmp2 = unsafe { &mut *(tmp1 as *mut UdpSocket) };
|
||||
let fut = tmp2.writable();
|
||||
self.state = FindIocState::WaitWritable(Box::pin(fut));
|
||||
let afd = tokio::io::unix::AsyncFd::new(self.sock.0).expect("can not create AsyncFd");
|
||||
self.afd = Some(afd);
|
||||
self.state = FindIocState::WaitWritable;
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -578,40 +633,35 @@ impl Future for FindIoc {
|
||||
Ready(Err(e))
|
||||
}
|
||||
},
|
||||
FindIocState::WaitWritable(ref mut fut) => match fut.poll_unpin(cx) {
|
||||
Ready(Ok(())) => match unsafe { Self::try_write(&mut self) } {
|
||||
Ok(Ready(())) => {
|
||||
info!("Writing done...");
|
||||
let tmp1 = self.sock.as_mut().unwrap();
|
||||
let tmp2 = unsafe { &mut *(tmp1 as *mut UdpSocket) };
|
||||
let fut = tmp2.readable();
|
||||
self.state = FindIocState::WaitReadable(Box::pin(fut));
|
||||
continue;
|
||||
FindIocState::WaitWritable => match self.afd.as_mut().unwrap().poll_write_ready(cx) {
|
||||
Ready(Ok(ref mut g)) => {
|
||||
g.clear_ready();
|
||||
match unsafe { Self::try_write(&mut self) } {
|
||||
Ok(()) => {
|
||||
self.state = FindIocState::WaitReadable;
|
||||
continue;
|
||||
}
|
||||
Err(e) => Ready(Err(e)),
|
||||
}
|
||||
Ok(Pending) => Pending,
|
||||
Err(e) => Ready(Err(e)),
|
||||
},
|
||||
}
|
||||
Ready(Err(e)) => Ready(Err(e.into())),
|
||||
Pending => Pending,
|
||||
},
|
||||
FindIocState::WaitReadable(ref mut fut) => match fut.poll_unpin(cx) {
|
||||
Ready(Ok(())) => match unsafe { Self::try_read(&mut self) } {
|
||||
Ok(Ready(())) => {
|
||||
info!("Reading done...");
|
||||
let addr = Ipv4Addr::new(127, 0, 0, 10);
|
||||
Ready(Ok(addr))
|
||||
FindIocState::WaitReadable => match self.afd.as_mut().unwrap().poll_read_ready(cx) {
|
||||
Ready(Ok(ref mut g)) => {
|
||||
g.clear_ready();
|
||||
match unsafe { Self::try_read(&mut self) } {
|
||||
Ok(()) => {
|
||||
continue;
|
||||
}
|
||||
Err(e) => Ready(Err(e)),
|
||||
}
|
||||
Ok(Pending) => Pending,
|
||||
Err(e) => Ready(Err(e)),
|
||||
},
|
||||
Ready(Err(e)) => Ready(Err(e.into())),
|
||||
Pending => match self.deadline.poll_unpin(cx) {
|
||||
Ready(()) => {
|
||||
info!("FindIoc deadline reached");
|
||||
Ready(Ok(Ipv4Addr::new(127, 0, 0, 10)))
|
||||
}
|
||||
Pending => Pending,
|
||||
},
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
error!("WaitReadable Err");
|
||||
Ready(Err(e.into()))
|
||||
}
|
||||
Pending => Pending,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@@ -20,7 +20,8 @@ pub struct Search {
|
||||
pub struct SearchRes {
|
||||
pub addr: u32,
|
||||
pub tcp_port: u16,
|
||||
pub sid: u32,
|
||||
pub id: u32,
|
||||
pub proto_version: u16,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -87,6 +88,8 @@ enum CaScalarType {
|
||||
I32,
|
||||
F32,
|
||||
F64,
|
||||
Enum,
|
||||
String,
|
||||
}
|
||||
|
||||
impl CaScalarType {
|
||||
@@ -98,6 +101,8 @@ impl CaScalarType {
|
||||
5 => I32,
|
||||
2 => F32,
|
||||
6 => F64,
|
||||
3 => Enum,
|
||||
0 => String,
|
||||
k => return Err(Error::with_msg_no_trace(format!("bad dbr type id: {k}"))),
|
||||
};
|
||||
Ok(ret)
|
||||
@@ -153,11 +158,11 @@ impl CaMsgTy {
|
||||
Version => 0,
|
||||
VersionRes(_) => 0,
|
||||
ClientName => 8,
|
||||
ClientNameRes(x) => (7 + x.name.len()) / 8 * 8,
|
||||
ClientNameRes(x) => (x.name.len() + 1 + 7) / 8 * 8,
|
||||
HostName => 8,
|
||||
Search(s) => (7 + s.channel.len()) / 8 * 8,
|
||||
Search(x) => (x.channel.len() + 1 + 7) / 8 * 8,
|
||||
SearchRes(_) => 8,
|
||||
CreateChan(x) => (7 + x.channel.len()) / 8 * 8,
|
||||
CreateChan(x) => (x.channel.len() + 1 + 7) / 8 * 8,
|
||||
CreateChanRes(_) => 0,
|
||||
AccessRightsRes(_) => 0,
|
||||
EventAdd(_) => 16,
|
||||
@@ -245,7 +250,7 @@ impl CaMsgTy {
|
||||
ClientNameRes(_) => 0,
|
||||
HostName => 0,
|
||||
Search(e) => e.id,
|
||||
SearchRes(x) => x.sid,
|
||||
SearchRes(x) => x.id,
|
||||
CreateChan(_) => CA_PROTO_VERSION as _,
|
||||
CreateChanRes(x) => x.sid,
|
||||
AccessRightsRes(x) => x.rights,
|
||||
@@ -279,7 +284,7 @@ impl CaMsgTy {
|
||||
}
|
||||
let d = e.channel.as_bytes();
|
||||
if buf.len() < d.len() + 1 {
|
||||
error!("bad buffer given");
|
||||
error!("bad buffer given for search payload {} vs {}", buf.len(), d.len());
|
||||
panic!();
|
||||
}
|
||||
unsafe { std::ptr::copy(&d[0] as _, &mut buf[0] as _, d.len()) };
|
||||
@@ -294,7 +299,7 @@ impl CaMsgTy {
|
||||
}
|
||||
let d = x.channel.as_bytes();
|
||||
if buf.len() < d.len() + 1 {
|
||||
error!("bad buffer given");
|
||||
error!("bad buffer given for create chan payload {} vs {}", buf.len(), d.len());
|
||||
panic!();
|
||||
}
|
||||
unsafe { std::ptr::copy(&d[0] as _, &mut buf[0] as _, d.len()) };
|
||||
@@ -376,11 +381,16 @@ impl CaMsg {
|
||||
if hi.data_count != 0 {
|
||||
warn!("protocol error: search result is expected with data count 0");
|
||||
}
|
||||
if payload.len() < 2 {
|
||||
return Err(Error::with_msg_no_trace("server did not include protocol version"));
|
||||
}
|
||||
let proto_version = u16::from_be_bytes(payload[0..2].try_into()?);
|
||||
CaMsg {
|
||||
ty: CaMsgTy::SearchRes(SearchRes {
|
||||
tcp_port: hi.data_type,
|
||||
addr: hi.param1,
|
||||
sid: hi.param2,
|
||||
id: hi.param2,
|
||||
proto_version,
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -408,9 +418,36 @@ impl CaMsg {
|
||||
let ca_st = CaScalarType::from_ca_u16(hi.data_type)?;
|
||||
match ca_st {
|
||||
CaScalarType::F64 => {
|
||||
// TODO handle wrong payload sizer in more distinct way.
|
||||
if payload.len() < 2 {
|
||||
return Err(Error::with_msg_no_trace(format!(
|
||||
"not enough payload for enum {}",
|
||||
payload.len()
|
||||
)));
|
||||
}
|
||||
let v = f64::from_be_bytes(payload.try_into()?);
|
||||
info!("Payload as f64: {v}");
|
||||
info!("f64: {v}");
|
||||
}
|
||||
CaScalarType::Enum => {
|
||||
if payload.len() < 2 {
|
||||
return Err(Error::with_msg_no_trace(format!(
|
||||
"not enough payload for enum {}",
|
||||
payload.len()
|
||||
)));
|
||||
}
|
||||
let v = u16::from_be_bytes(payload[..2].try_into()?);
|
||||
info!("enum payload: {v}");
|
||||
}
|
||||
CaScalarType::String => {
|
||||
let mut ixn = payload.len();
|
||||
for (i, &c) in payload.iter().enumerate() {
|
||||
if c == 0 {
|
||||
ixn = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
info!("try to read string from payload len {} ixn {}", payload.len(), ixn);
|
||||
let v = String::from_utf8_lossy(&payload[..ixn]);
|
||||
info!("String payload: {v}");
|
||||
}
|
||||
_ => {
|
||||
warn!("TODO handle {ca_st:?}");
|
||||
|
||||
Reference in New Issue
Block a user