Search more channels and store in db

This commit is contained in:
Dominik Werder
2022-05-03 17:10:48 +02:00
parent beaf520d75
commit 2d7d8f0bbd
6 changed files with 284 additions and 94 deletions

View File

@@ -5,8 +5,9 @@ use futures_util::{Future, FutureExt, Stream, StreamExt};
use libc::c_int;
use log::*;
use std::collections::BTreeMap;
use std::net::Ipv4Addr;
use std::net::{Ipv4Addr, SocketAddrV4};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use tokio::io::unix::AsyncFd;
@@ -129,7 +130,7 @@ impl Stream for CaConn {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
self.poll_count += 1;
if self.poll_count > 3000 {
if false && self.poll_count > 3000 {
error!("TODO CaConn reached poll_count limit");
return Ready(None);
}
@@ -343,11 +344,7 @@ impl Drop for SockBox {
}
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct Tcp4Addr {
pub addr: Ipv4Addr,
pub port: u16,
}
const SEARCH_ID: AtomicUsize = AtomicUsize::new(0);
pub struct FindIoc {
state: FindIocState,
@@ -358,23 +355,28 @@ pub struct FindIoc {
addr: libc::sockaddr_in,
addr_len: usize,
deadline: Pin<Box<tokio::time::Sleep>>,
result: Option<Tcp4Addr>,
result: Option<SocketAddrV4>,
addr_bind: Ipv4Addr,
addr_conn: SocketAddrV4,
}
// Do low-level approach first to make sure it works as specified.
impl FindIoc {
pub fn new(channel: String) -> Self {
pub fn new(channel: String, addr_bind: Ipv4Addr, addr_conn: SocketAddrV4, timeout: u64) -> Self {
let addr = unsafe { std::mem::transmute_copy(&[0u8; std::mem::size_of::<libc::sockaddr_in>()]) };
let search_id = SEARCH_ID.fetch_add(1, Ordering::AcqRel) as u32;
Self {
state: FindIocState::Init,
channel,
search_id: 0x12345678,
search_id,
sock: SockBox(-1),
afd: None,
addr: addr,
addr_len: 0,
deadline: Box::pin(tokio::time::sleep(Duration::from_millis(200))),
deadline: Box::pin(tokio::time::sleep(Duration::from_millis(timeout))),
result: None,
addr_bind,
addr_conn,
}
}
@@ -403,8 +405,7 @@ impl FindIoc {
return Err("can not set nonblock".into());
}
}
//let ip: [u8; 4] = [172, 26, 120, 71];
let ip: [u8; 4] = [0, 0, 0, 0];
let ip: [u8; 4] = self.addr_bind.octets();
let addr = libc::sockaddr_in {
sin_family: libc::AF_INET as u16,
sin_port: 0,
@@ -446,43 +447,34 @@ impl FindIoc {
unsafe fn try_write(&mut self) -> Result<(), Error> {
let sock = self.sock.0;
let ip: [u8; 4] = [172, 26, 120, 255];
let ip = self.addr_conn.ip().octets();
let addr = libc::sockaddr_in {
sin_family: libc::AF_INET as u16,
sin_port: (5064 as u16).to_be(),
sin_port: (self.addr_conn.port() as u16).to_be(),
sin_addr: libc::in_addr {
s_addr: u32::from_ne_bytes(ip),
},
sin_zero: [0; 8],
};
let addr_len = std::mem::size_of::<libc::sockaddr_in>();
let chb = self.channel.as_bytes();
let npadded = (chb.len() + 1 + 7) / 8 * 8;
let npad = npadded - self.channel.len();
let mut buf = vec![
//
0u8, 0, 0, 0, //
0, 0, 0, 13, //
0, 0, 0, 0, //
0, 0, 0, 0, //
//
//
0, 6, 0, 0, //
0, 0, 0, 13, //
0, 0, 0, 0, //
0, 0, 0, 0,
//
//
];
let chb = self.channel.as_bytes();
let npadded = (chb.len() + 1 + 7) / 8 * 8;
let npad = npadded - self.channel.len();
buf.extend_from_slice(&[0, 6]);
buf.extend_from_slice(&(npadded as u16).to_be_bytes());
buf.extend_from_slice(&[0, 0, 0, 13]);
buf.extend_from_slice(&[0, 0, 0, 0]);
buf.extend_from_slice(&self.search_id.to_be_bytes());
buf.extend_from_slice(chb);
buf.extend_from_slice(&vec![0u8; npad]);
let npl = (npadded as u16).to_be_bytes();
buf[16 + 2] = npl[0];
buf[16 + 3] = npl[1];
let a = self.search_id.to_be_bytes();
for (x, y) in buf[16 + 12..16 + 16].iter_mut().zip(a.into_iter()) {
*x = y;
}
//info!("sendto {ip:?} n {}", buf.len());
let ec = libc::sendto(
sock,
&buf[0] as *const _ as _,
@@ -532,7 +524,12 @@ impl FindIoc {
let saddr2: libc::sockaddr_in = std::mem::transmute_copy(&saddr_mem);
let src_addr = Ipv4Addr::from(saddr2.sin_addr.s_addr.to_ne_bytes());
let src_port = u16::from_be(saddr2.sin_port);
info!("received from {:?} port {}", src_addr, src_port);
trace!(
"received from src_addr {:?} src_port {} ec {}",
src_addr,
src_port,
ec
);
if false {
let mut s1 = String::new();
for i in 0..(ec as usize) {
@@ -566,7 +563,7 @@ impl FindIoc {
nb.adv(hi.payload())?;
msgs.push(msg);
}
info!("received {} msgs", msgs.len());
//info!("received {} msgs {:?}", msgs.len(), msgs);
for (msg_ix, msg) in msgs.iter().enumerate() {
match &msg.ty {
CaMsgTy::SearchRes(k) => {
@@ -578,7 +575,7 @@ impl FindIoc {
info!("Converted address: {addr:?}");
}
info!(
"Received: {}/{} {:?} {:?} {}",
"SearchRes: {}/{} {:?} {:?} {}",
msg_ix,
msgs.len(),
self.channel,
@@ -586,17 +583,12 @@ impl FindIoc {
k.tcp_port
);
if self.result.is_none() {
self.result = Some(Tcp4Addr {
addr: src_addr,
port: k.tcp_port,
});
self.result = Some(SocketAddrV4::new(src_addr, k.tcp_port));
} else {
warn!("Result already populated for {}", self.channel);
}
}
_ => {
info!("{msg:?}");
}
_ => {}
}
}
}
@@ -605,18 +597,15 @@ impl FindIoc {
}
impl Future for FindIoc {
type Output = Result<Tcp4Addr, Error>;
// TODO use a dedicated type to indicate timeout.
type Output = Result<Option<SocketAddrV4>, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
loop {
match self.deadline.poll_unpin(cx) {
Ready(()) => {
break Ready(
self.result
.clone()
.ok_or_else(|| Error::with_msg_no_trace(format!("can not find host for {}", self.channel))),
);
break Ready(Ok(self.result.clone()));
}
Pending => {}
}

View File

@@ -445,7 +445,7 @@ impl CaMsg {
break;
}
}
info!("try to read string from payload len {} ixn {}", payload.len(), ixn);
//info!("try to read string from payload len {} ixn {}", payload.len(), ixn);
let v = String::from_utf8_lossy(&payload[..ixn]);
info!("String payload: {v}");
}
@@ -730,7 +730,7 @@ impl CaProto {
break match &self.state {
CaState::StdHead => {
let hi = HeadInfo::from_netbuf(&mut self.buf)?;
if hi.cmdid == 6 || hi.cmdid > 26 || hi.data_type > 10 || hi.payload_size > 8 {
if hi.cmdid == 6 || hi.cmdid > 26 || hi.data_type > 10 || hi.payload_size > 40 {
warn!("StdHead {hi:?}");
}
if hi.payload_size == 0xffff && hi.data_count == 0 {