diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index da39041..18240fe 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -1,5 +1,5 @@ use clap::Parser; -use daqingest::{DaqIngestOpts, SubCmd}; +use daqingest::{ChannelAccess, DaqIngestOpts, SubCmd}; use err::Error; pub fn main() -> Result<(), Error> { @@ -19,7 +19,10 @@ pub fn main() -> Result<(), Error> { let mut f = netfetch::zmtp::BsreadDumper::new(k.source); f.run().await? } - SubCmd::ChannelAccess(k) => netfetch::ca::ca_connect_3(k.into()).await?, + SubCmd::ChannelAccess(k) => match k { + ChannelAccess::CaChannel(k) => netfetch::ca::ca_connect(k.into()).await?, + ChannelAccess::CaConfig(k) => netfetch::ca::ca_listen_from_file(k.config).await?, + }, } Ok(()) }) diff --git a/daqingest/src/daqingest.rs b/daqingest/src/daqingest.rs index 78e4e2c..400febb 100644 --- a/daqingest/src/daqingest.rs +++ b/daqingest/src/daqingest.rs @@ -23,6 +23,7 @@ pub enum SubCmd { ListPulses, FetchEvents(FetchEvents), BsreadDump(BsreadDump), + #[clap(subcommand)] ChannelAccess(ChannelAccess), } @@ -72,18 +73,25 @@ pub struct BsreadDump { } #[derive(Debug, Parser)] -pub struct ChannelAccess { - #[clap(long)] - pub source: String, +pub enum ChannelAccess { + CaChannel(CaChannel), + CaConfig(CaConfig), +} + +#[derive(Debug, Parser)] +pub struct CaChannel { #[clap(long)] pub channel: Vec, } -impl From for CaConnectOpts { - fn from(k: ChannelAccess) -> Self { - Self { - source: k.source, - channels: k.channel, - } +impl From for CaConnectOpts { + fn from(k: CaChannel) -> Self { + Self { channels: k.channel } } } + +#[derive(Debug, Parser)] +pub struct CaConfig { + #[clap(long)] + pub config: String, +} diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index dc972ef..23363f7 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -24,6 +24,7 @@ scylla = "0.4" md-5 = "0.9" hex = "0.4" libc = "0.2" +regex = "1.5.5" log = { path = "../log" } stats = { path = "../stats" } err = { path = "../../daqbuffer/err" } diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 9cf8f51..91c1c44 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -1,36 +1,135 @@ pub mod conn; pub mod proto; -use crate::ca::conn::FindIoc; - -use self::conn::CaConn; +use conn::{CaConn, FindIoc}; use err::Error; -use futures_util::StreamExt; +use futures_util::stream::{FuturesOrdered, FuturesUnordered}; +use futures_util::{StreamExt, TryFutureExt}; use log::*; +use std::collections::{BTreeMap, VecDeque}; +use std::path::PathBuf; +use tokio::fs::OpenOptions; +use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::net::TcpStream; +pub async fn ca_listen_from_file(conf: impl Into) -> Result<(), Error> { + let file = OpenOptions::new().read(true).open(conf.into()).await?; + let mut lines = BufReader::new(file).lines(); + let re = regex::Regex::new(r"^([-:._A-Za-z0-9]+)")?; + let mut channels = vec![]; + while let Some(line) = lines.next_line().await? { + if let Some(cs) = re.captures(&line) { + let m = cs.get(1).unwrap(); + let channel = m.as_str(); + channels.push(channel.to_string()); + } + } + let opts = CaConnectOpts { channels }; + ca_connect(opts).await +} + pub struct CaConnectOpts { - pub source: String, pub channels: Vec, } -pub async fn ca_connect_3(opts: CaConnectOpts) -> Result<(), Error> { - debug!("ca_connect_3"); - let addr = FindIoc::new(opts.channels[0].clone()).await?; - info!("Found IOC address: {addr:?}"); - let tcp = TcpStream::connect(&opts.source).await?; - let mut conn = CaConn::new(tcp); - for c in opts.channels { - conn.channel_add(c); +pub async fn ca_connect(opts: CaConnectOpts) -> Result<(), Error> { + info!("Look up {} channel hosts", opts.channels.len()); + let mut fut_queue = FuturesUnordered::new(); + let mut res2 = vec![]; + let mut chns = VecDeque::from(opts.channels); + 'lo2: loop { + const MAX_SIMUL: usize = 23; + while fut_queue.len() < MAX_SIMUL && chns.len() > 0 { + let ch = chns.pop_front().unwrap(); + let ch2 = ch.clone(); + info!("Start search for {}", ch); + let fut = FindIoc::new(ch.clone()).map_ok(move |x| (ch2, x)); + let jh = tokio::spawn(fut); + fut_queue.push(jh); + if chns.is_empty() { + break 'lo2; + } + } + while fut_queue.len() >= MAX_SIMUL { + match fut_queue.next().await { + Some(item) => { + res2.push(item); + } + None => break, + } + } } - while let Some(item) = conn.next().await { + while fut_queue.len() > 0 { + match fut_queue.next().await { + Some(item) => { + res2.push(item); + } + None => break, + } + } + info!("Collected {} results", res2.len()); + let mut channels_by_host = BTreeMap::new(); + for item in res2 { + // TODO should we continue even if some channel gives an error? + let item = item + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}"))) + .unwrap_or_else(|e| Err(e)); match item { - Ok(k) => { - trace!("CaConn gives item: {k:?}"); + Ok(item) => { + info!("Found address {} {:?}", item.0, item.1); + let key = item.1; + if !channels_by_host.contains_key(&key) { + channels_by_host.insert(key, vec![item.0]); + } else { + channels_by_host.get_mut(&key).unwrap().push(item.0); + } } Err(e) => { - error!("CaConn gives error: {e:?}"); - break; + error!("Got error: {e:?}"); + } + }; + } + for (host, channels) in &channels_by_host { + info!("Have: {:?} {:?}", host, channels); + } + if false { + return Ok(()); + } + let mut conn_jhs = vec![]; + for (host, channels) in channels_by_host { + let conn_block = async move { + info!("Create TCP connection to {:?}", (host.addr, host.port)); + let tcp = TcpStream::connect((host.addr, host.port)).await?; + let mut conn = CaConn::new(tcp); + for c in channels { + conn.channel_add(c); + } + while let Some(item) = conn.next().await { + match item { + Ok(k) => { + trace!("CaConn gives item: {k:?}"); + } + Err(e) => { + error!("CaConn gives error: {e:?}"); + break; + } + } + } + Ok::<_, Error>(()) + }; + let jh = tokio::spawn(conn_block); + conn_jhs.push(jh); + } + for jh in conn_jhs { + match jh.await { + Ok(k) => match k { + Ok(_) => {} + Err(e) => { + error!("{e:?}"); + } + }, + Err(e) => { + error!("{e:?}"); } } } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 378d3f4..e15d137 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -9,7 +9,8 @@ use std::net::Ipv4Addr; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; -use tokio::net::{TcpStream, UdpSocket}; +use tokio::io::unix::AsyncFd; +use tokio::net::TcpStream; #[derive(Debug)] enum ChannelError { @@ -325,17 +326,39 @@ impl Stream for CaConn { enum FindIocState { Init, - WaitWritable(Pin> + Send>>), - WaitReadable(Pin> + Send>>), + WaitWritable, + WaitReadable, +} + +struct SockBox(c_int); + +impl Drop for SockBox { + fn drop(self: &mut Self) { + if self.0 != -1 { + unsafe { + libc::close(self.0); + self.0 = -1; + } + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct Tcp4Addr { + pub addr: Ipv4Addr, + pub port: u16, } pub struct FindIoc { state: FindIocState, channel: String, - sock: Option, + search_id: u32, + sock: SockBox, + afd: Option>, addr: libc::sockaddr_in, addr_len: usize, deadline: Pin>, + result: Option, } // Do low-level approach first to make sure it works as specified. @@ -345,24 +368,26 @@ impl FindIoc { Self { state: FindIocState::Init, channel, - sock: None, + search_id: 0x12345678, + sock: SockBox(-1), + afd: None, addr: addr, addr_len: 0, - deadline: Box::pin(tokio::time::sleep(Duration::from_millis(3000))), + deadline: Box::pin(tokio::time::sleep(Duration::from_millis(200))), + result: None, } } unsafe fn create_socket(&mut self) -> Result<(), Error> { - // TODO remember to clean up socket on failure. let ec = libc::socket(libc::AF_INET, libc::SOCK_DGRAM, 0); if ec == -1 { return Err("can not create socket".into()); } - let sock = ec; + let sock = SockBox(ec); { let opt: libc::c_int = 1; let ec = libc::setsockopt( - sock, + sock.0, libc::SOL_SOCKET, libc::SO_BROADCAST, &opt as *const _ as _, @@ -373,16 +398,16 @@ impl FindIoc { } } { - let ec = libc::fcntl(sock, libc::F_SETFL, libc::O_NONBLOCK); + let ec = libc::fcntl(sock.0, libc::F_SETFL, libc::O_NONBLOCK); if ec == -1 { return Err("can not set nonblock".into()); } } - let ip: [u8; 4] = [172, 26, 120, 71]; + //let ip: [u8; 4] = [172, 26, 120, 71]; + let ip: [u8; 4] = [0, 0, 0, 0]; let addr = libc::sockaddr_in { sin_family: libc::AF_INET as u16, - sin_port: u16::from_ne_bytes((13882 as u16).to_be_bytes()), - // 172.26.120.71 + sin_port: 0, sin_addr: libc::in_addr { s_addr: u32::from_ne_bytes(ip), }, @@ -391,26 +416,36 @@ impl FindIoc { let addr_len = std::mem::size_of::(); self.addr = addr.clone(); self.addr_len = addr_len; - let ec = libc::bind(sock, &addr as *const _ as _, addr_len as _); + let ec = libc::bind(sock.0, &addr as *const _ as _, addr_len as _); if ec == -1 { return Err("can not bind socket".into()); } - let sock = ::from_raw_fd(sock); - let sock = match UdpSocket::from_std(sock) { - Ok(k) => k, - Err(e) => { - error!("can not convert raw socket to tokio socket"); + { + let mut addr = libc::sockaddr_in { + sin_family: libc::AF_INET as u16, + sin_port: 0, + sin_addr: libc::in_addr { s_addr: 0 }, + sin_zero: [0; 8], + }; + let mut addr_len = std::mem::size_of::(); + let ec = libc::getsockname(sock.0, &mut addr as *mut _ as _, &mut addr_len as *mut _ as _); + if ec == -1 { + error!("getsockname {ec}"); return Err("can not convert raw socket to tokio socket".into()); + } else { + if false { + let ipv4 = Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes()); + let tcp_port = u16::from_be(addr.sin_port); + info!("bound local socket to {:?} port {}", ipv4, tcp_port); + } } - }; - self.sock = Some(sock); - info!("Ok created socket"); + } + self.sock = sock; Ok(()) } - unsafe fn try_write(&mut self) -> Result, Error> { - use std::os::unix::prelude::AsRawFd; - let sock = self.sock.as_ref().unwrap().as_raw_fd(); + unsafe fn try_write(&mut self) -> Result<(), Error> { + let sock = self.sock.0; let ip: [u8; 4] = [172, 26, 120, 255]; let addr = libc::sockaddr_in { sin_family: libc::AF_INET as u16, @@ -432,26 +467,22 @@ impl FindIoc { 0, 6, 0, 0, // 0, 0, 0, 13, // 0, 0, 0, 0, // - 11, 12, 11, 12, + 0, 0, 0, 0, // // ]; let chb = self.channel.as_bytes(); let npadded = (chb.len() + 1 + 7) / 8 * 8; let npad = npadded - self.channel.len(); - info!( - "string len {} chb len {} npadded {} npad {}", - self.channel.len(), - chb.len(), - npadded, - npad - ); buf.extend_from_slice(chb); buf.extend_from_slice(&vec![0u8; npad]); let npl = (npadded as u16).to_be_bytes(); buf[16 + 2] = npl[0]; buf[16 + 3] = npl[1]; - info!("Sending {} bytes", buf.len()); + let a = self.search_id.to_be_bytes(); + for (x, y) in buf[16 + 12..16 + 16].iter_mut().zip(a.into_iter()) { + *x = y; + } let ec = libc::sendto( sock, &buf[0] as *const _ as _, @@ -463,19 +494,17 @@ impl FindIoc { if ec == -1 { let errno = *libc::__errno_location(); if errno == libc::EAGAIN { - info!("NOT YET READY FOR SENDING..."); - return Ok(Poll::Pending); + error!("NOT YET READY FOR SENDING..."); + return Err("socket not ready for write".into()); } else { return Err("can not send".into()); } } - Ok(Poll::Ready(())) + Ok(()) } - unsafe fn try_read(&mut self) -> Result, Error> { - info!("Receiving..."); - use std::os::unix::prelude::AsRawFd; - let sock = self.sock.as_ref().unwrap().as_raw_fd(); + unsafe fn try_read(&mut self) -> Result<(), Error> { + let sock = self.sock.0; let mut saddr_mem = [0u8; std::mem::size_of::()]; let mut saddr_len: libc::socklen_t = saddr_mem.len() as _; let mut buf = vec![0u8; 1024]; @@ -490,14 +519,16 @@ impl FindIoc { if ec == -1 { let errno = *libc::__errno_location(); if errno == libc::EAGAIN { - info!("try_read BUT NOT YET READY FOR READING..."); - return Ok(Poll::Pending); + error!("try_read BUT NOT YET READY FOR READING..."); + return Err("socket not ready for read".into()); } else { return Err("can not read".into()); } - } - info!("received ec {ec}"); - if ec > 0 { + } else if ec < 0 { + error!("unexpected received {ec}"); + } else if ec == 0 { + error!("received zero bytes"); + } else { let saddr2: libc::sockaddr_in = std::mem::transmute_copy(&saddr_mem); let src_addr = Ipv4Addr::from(saddr2.sin_addr.s_addr.to_ne_bytes()); let src_port = u16::from_be(saddr2.sin_port); @@ -507,18 +538,14 @@ impl FindIoc { for i in 0..(ec as usize) { s1.extend(format!(" {:02x}", buf[i]).chars()); } - info!("received answer {s1}"); - info!( + debug!("received answer {s1}"); + debug!( "received answer string {}", String::from_utf8_lossy(buf[..ec as usize].into()) ); } - // TODO handle that the remote should send its protocol version in the payload. // TODO handle if we get a too large answer. - // TODO - // Parse the contents of the received datagram... - // Reuse the existing logic for that. - let mut nb = crate::netbuf::NetBuf::new(1024); + let mut nb = crate::netbuf::NetBuf::new(2048); nb.put_slice(&buf[..ec as usize])?; let mut msgs = vec![]; loop { @@ -539,14 +566,33 @@ impl FindIoc { nb.adv(hi.payload())?; msgs.push(msg); } - info!("got {} messages", msgs.len()); - for msg in &msgs { + info!("received {} msgs", msgs.len()); + for (msg_ix, msg) in msgs.iter().enumerate() { match &msg.ty { CaMsgTy::SearchRes(k) => { - // TODO make sure that search identifier is correct. - let addr = Ipv4Addr::from(k.addr.to_be_bytes()); - info!("ADDRESS: {addr:?}"); - info!("PORT: {}", k.tcp_port); + if k.id != self.search_id { + warn!("id mismatch {} vs {}", k.id, self.search_id); + } + if false { + let addr = Ipv4Addr::from(k.addr.to_be_bytes()); + info!("Converted address: {addr:?}"); + } + info!( + "Received: {}/{} {:?} {:?} {}", + msg_ix, + msgs.len(), + self.channel, + src_addr, + k.tcp_port + ); + if self.result.is_none() { + self.result = Some(Tcp4Addr { + addr: src_addr, + port: k.tcp_port, + }); + } else { + warn!("Result already populated for {}", self.channel); + } } _ => { info!("{msg:?}"); @@ -554,23 +600,32 @@ impl FindIoc { } } } - Ok(Poll::Ready(())) + Ok(()) } } impl Future for FindIoc { - type Output = Result; + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { use Poll::*; loop { + match self.deadline.poll_unpin(cx) { + Ready(()) => { + break Ready( + self.result + .clone() + .ok_or_else(|| Error::with_msg_no_trace(format!("can not find host for {}", self.channel))), + ); + } + Pending => {} + } break match &mut self.state { FindIocState::Init => match unsafe { Self::create_socket(&mut self) } { Ok(()) => { - let tmp1 = self.sock.as_mut().unwrap(); - let tmp2 = unsafe { &mut *(tmp1 as *mut UdpSocket) }; - let fut = tmp2.writable(); - self.state = FindIocState::WaitWritable(Box::pin(fut)); + let afd = tokio::io::unix::AsyncFd::new(self.sock.0).expect("can not create AsyncFd"); + self.afd = Some(afd); + self.state = FindIocState::WaitWritable; continue; } Err(e) => { @@ -578,40 +633,35 @@ impl Future for FindIoc { Ready(Err(e)) } }, - FindIocState::WaitWritable(ref mut fut) => match fut.poll_unpin(cx) { - Ready(Ok(())) => match unsafe { Self::try_write(&mut self) } { - Ok(Ready(())) => { - info!("Writing done..."); - let tmp1 = self.sock.as_mut().unwrap(); - let tmp2 = unsafe { &mut *(tmp1 as *mut UdpSocket) }; - let fut = tmp2.readable(); - self.state = FindIocState::WaitReadable(Box::pin(fut)); - continue; + FindIocState::WaitWritable => match self.afd.as_mut().unwrap().poll_write_ready(cx) { + Ready(Ok(ref mut g)) => { + g.clear_ready(); + match unsafe { Self::try_write(&mut self) } { + Ok(()) => { + self.state = FindIocState::WaitReadable; + continue; + } + Err(e) => Ready(Err(e)), } - Ok(Pending) => Pending, - Err(e) => Ready(Err(e)), - }, + } Ready(Err(e)) => Ready(Err(e.into())), Pending => Pending, }, - FindIocState::WaitReadable(ref mut fut) => match fut.poll_unpin(cx) { - Ready(Ok(())) => match unsafe { Self::try_read(&mut self) } { - Ok(Ready(())) => { - info!("Reading done..."); - let addr = Ipv4Addr::new(127, 0, 0, 10); - Ready(Ok(addr)) + FindIocState::WaitReadable => match self.afd.as_mut().unwrap().poll_read_ready(cx) { + Ready(Ok(ref mut g)) => { + g.clear_ready(); + match unsafe { Self::try_read(&mut self) } { + Ok(()) => { + continue; + } + Err(e) => Ready(Err(e)), } - Ok(Pending) => Pending, - Err(e) => Ready(Err(e)), - }, - Ready(Err(e)) => Ready(Err(e.into())), - Pending => match self.deadline.poll_unpin(cx) { - Ready(()) => { - info!("FindIoc deadline reached"); - Ready(Ok(Ipv4Addr::new(127, 0, 0, 10))) - } - Pending => Pending, - }, + } + Ready(Err(e)) => { + error!("WaitReadable Err"); + Ready(Err(e.into())) + } + Pending => Pending, }, }; } diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index be6a159..5c6928f 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -20,7 +20,8 @@ pub struct Search { pub struct SearchRes { pub addr: u32, pub tcp_port: u16, - pub sid: u32, + pub id: u32, + pub proto_version: u16, } #[derive(Debug)] @@ -87,6 +88,8 @@ enum CaScalarType { I32, F32, F64, + Enum, + String, } impl CaScalarType { @@ -98,6 +101,8 @@ impl CaScalarType { 5 => I32, 2 => F32, 6 => F64, + 3 => Enum, + 0 => String, k => return Err(Error::with_msg_no_trace(format!("bad dbr type id: {k}"))), }; Ok(ret) @@ -153,11 +158,11 @@ impl CaMsgTy { Version => 0, VersionRes(_) => 0, ClientName => 8, - ClientNameRes(x) => (7 + x.name.len()) / 8 * 8, + ClientNameRes(x) => (x.name.len() + 1 + 7) / 8 * 8, HostName => 8, - Search(s) => (7 + s.channel.len()) / 8 * 8, + Search(x) => (x.channel.len() + 1 + 7) / 8 * 8, SearchRes(_) => 8, - CreateChan(x) => (7 + x.channel.len()) / 8 * 8, + CreateChan(x) => (x.channel.len() + 1 + 7) / 8 * 8, CreateChanRes(_) => 0, AccessRightsRes(_) => 0, EventAdd(_) => 16, @@ -245,7 +250,7 @@ impl CaMsgTy { ClientNameRes(_) => 0, HostName => 0, Search(e) => e.id, - SearchRes(x) => x.sid, + SearchRes(x) => x.id, CreateChan(_) => CA_PROTO_VERSION as _, CreateChanRes(x) => x.sid, AccessRightsRes(x) => x.rights, @@ -279,7 +284,7 @@ impl CaMsgTy { } let d = e.channel.as_bytes(); if buf.len() < d.len() + 1 { - error!("bad buffer given"); + error!("bad buffer given for search payload {} vs {}", buf.len(), d.len()); panic!(); } unsafe { std::ptr::copy(&d[0] as _, &mut buf[0] as _, d.len()) }; @@ -294,7 +299,7 @@ impl CaMsgTy { } let d = x.channel.as_bytes(); if buf.len() < d.len() + 1 { - error!("bad buffer given"); + error!("bad buffer given for create chan payload {} vs {}", buf.len(), d.len()); panic!(); } unsafe { std::ptr::copy(&d[0] as _, &mut buf[0] as _, d.len()) }; @@ -376,11 +381,16 @@ impl CaMsg { if hi.data_count != 0 { warn!("protocol error: search result is expected with data count 0"); } + if payload.len() < 2 { + return Err(Error::with_msg_no_trace("server did not include protocol version")); + } + let proto_version = u16::from_be_bytes(payload[0..2].try_into()?); CaMsg { ty: CaMsgTy::SearchRes(SearchRes { tcp_port: hi.data_type, addr: hi.param1, - sid: hi.param2, + id: hi.param2, + proto_version, }), } } @@ -408,9 +418,36 @@ impl CaMsg { let ca_st = CaScalarType::from_ca_u16(hi.data_type)?; match ca_st { CaScalarType::F64 => { - // TODO handle wrong payload sizer in more distinct way. + if payload.len() < 2 { + return Err(Error::with_msg_no_trace(format!( + "not enough payload for enum {}", + payload.len() + ))); + } let v = f64::from_be_bytes(payload.try_into()?); - info!("Payload as f64: {v}"); + info!("f64: {v}"); + } + CaScalarType::Enum => { + if payload.len() < 2 { + return Err(Error::with_msg_no_trace(format!( + "not enough payload for enum {}", + payload.len() + ))); + } + let v = u16::from_be_bytes(payload[..2].try_into()?); + info!("enum payload: {v}"); + } + CaScalarType::String => { + let mut ixn = payload.len(); + for (i, &c) in payload.iter().enumerate() { + if c == 0 { + ixn = i; + break; + } + } + info!("try to read string from payload len {} ixn {}", payload.len(), ixn); + let v = String::from_utf8_lossy(&payload[..ixn]); + info!("String payload: {v}"); } _ => { warn!("TODO handle {ca_st:?}");