From 2d7d8f0bbd2b77218b32f87262cca86285b7885e Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 3 May 2022 17:10:48 +0200 Subject: [PATCH] Search more channels and store in db --- daqingest/src/bin/daqingest.rs | 5 +- daqingest/src/daqingest.rs | 26 +++- netfetch/Cargo.toml | 1 + netfetch/src/ca.rs | 255 ++++++++++++++++++++++++++++----- netfetch/src/ca/conn.rs | 87 +++++------ netfetch/src/ca/proto.rs | 4 +- 6 files changed, 284 insertions(+), 94 deletions(-) diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 18240fe..6056d90 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -20,8 +20,9 @@ pub fn main() -> Result<(), Error> { f.run().await? } SubCmd::ChannelAccess(k) => match k { - ChannelAccess::CaChannel(k) => netfetch::ca::ca_connect(k.into()).await?, - ChannelAccess::CaConfig(k) => netfetch::ca::ca_listen_from_file(k.config).await?, + ChannelAccess::CaChannel(_) => todo!(), + ChannelAccess::CaSearch(k) => netfetch::ca::ca_search(k.into()).await?, + ChannelAccess::CaConfig(k) => netfetch::ca::ca_connect(k.into()).await?, }, } Ok(()) diff --git a/daqingest/src/daqingest.rs b/daqingest/src/daqingest.rs index 400febb..3fb5f1e 100644 --- a/daqingest/src/daqingest.rs +++ b/daqingest/src/daqingest.rs @@ -1,7 +1,7 @@ pub mod query; use clap::Parser; -use netfetch::ca::CaConnectOpts; +use netfetch::ca::{CaConnectOpts, ListenFromFileOpts}; use netfetch::zmtp::ZmtpClientOpts; #[derive(Debug, Parser)] @@ -76,22 +76,42 @@ pub struct BsreadDump { pub enum ChannelAccess { CaChannel(CaChannel), CaConfig(CaConfig), + CaSearch(CaConfig), } #[derive(Debug, Parser)] pub struct CaChannel { #[clap(long)] pub channel: Vec, + #[clap(long)] + pub addr_bind: String, + #[clap(long)] + pub addr_conn: String, } impl From for CaConnectOpts { fn from(k: CaChannel) -> Self { - Self { channels: k.channel } + Self { + channels: k.channel, + search: vec!["255.255.255.255".into()], + addr_bind: k.addr_bind.parse().expect("can not parse address"), + addr_conn: k.addr_conn.parse().expect("can not parse address"), + max_simul: 113, + timeout: 2000, + abort_after_search: 0, + } } } #[derive(Debug, Parser)] pub struct CaConfig { - #[clap(long)] pub config: String, } + +impl From for ListenFromFileOpts { + fn from(k: CaConfig) -> Self { + Self { + config: k.config.into(), + } + } +} diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 23363f7..1407505 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -11,6 +11,7 @@ path = "src/netfetch.rs" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_cbor = "0.11" +serde_yaml = "0.8.23" tokio = { version = "1.7", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } tokio-stream = { version = "0.1", features = ["fs"]} async-channel = "1.6" diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 91c1c44..de92af2 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -3,56 +3,192 @@ pub mod proto; use conn::{CaConn, FindIoc}; use err::Error; -use futures_util::stream::{FuturesOrdered, FuturesUnordered}; +use futures_util::stream::FuturesUnordered; use futures_util::{StreamExt, TryFutureExt}; use log::*; +use scylla::batch::Consistency; +use scylla::prepared_statement::PreparedStatement; +use scylla::Session as ScySession; +use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, VecDeque}; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::path::PathBuf; +use std::time::Duration; use tokio::fs::OpenOptions; -use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::io::AsyncReadExt; use tokio::net::TcpStream; +use tokio::task::JoinError; +use tokio::time::error::Elapsed; -pub async fn ca_listen_from_file(conf: impl Into) -> Result<(), Error> { - let file = OpenOptions::new().read(true).open(conf.into()).await?; - let mut lines = BufReader::new(file).lines(); - let re = regex::Regex::new(r"^([-:._A-Za-z0-9]+)")?; - let mut channels = vec![]; - while let Some(line) = lines.next_line().await? { - if let Some(cs) = re.captures(&line) { - let m = cs.get(1).unwrap(); - let channel = m.as_str(); - channels.push(channel.to_string()); - } - } - let opts = CaConnectOpts { channels }; - ca_connect(opts).await +#[derive(Debug, Serialize, Deserialize)] +struct ChannelConfig { + channels: Vec, + search: Vec, + addr_bind: Ipv4Addr, + addr_conn: Ipv4Addr, + whitelist: String, + blacklist: String, + max_simul: Option, + timeout: Option, + #[serde(default)] + abort_after_search: u32, +} + +pub struct ListenFromFileOpts { + pub config: PathBuf, +} + +pub async fn parse_config(config: PathBuf) -> Result { + let mut file = OpenOptions::new().read(true).open(config).await?; + let mut buf = vec![]; + file.read_to_end(&mut buf).await?; + let mut conf: ChannelConfig = + serde_yaml::from_slice(&buf).map_err(|e| Error::with_msg_no_trace(format!("{:?}", e)))?; + let re1 = regex::Regex::new(&conf.whitelist)?; + let re2 = regex::Regex::new(&conf.blacklist)?; + conf.channels = conf + .channels + .into_iter() + .filter(|ch| { + if let Some(_cs) = re1.captures(&ch) { + //let m = cs.get(1).unwrap(); + true + } else if re2.is_match(&ch) { + false + } else { + true + } + }) + .collect(); + Ok(CaConnectOpts { + channels: conf.channels, + search: conf.search, + addr_bind: conf.addr_bind, + addr_conn: conf.addr_conn, + max_simul: conf.max_simul.unwrap_or(113), + timeout: conf.timeout.unwrap_or(2000), + abort_after_search: conf.abort_after_search, + }) } pub struct CaConnectOpts { pub channels: Vec, + pub search: Vec, + pub addr_bind: Ipv4Addr, + pub addr_conn: Ipv4Addr, + pub max_simul: usize, + pub timeout: u64, + pub abort_after_search: u32, } -pub async fn ca_connect(opts: CaConnectOpts) -> Result<(), Error> { +async fn unwrap_search_result( + item: Result), Error>, Elapsed>, JoinError>, + scy: &ScySession, + qu: &PreparedStatement, +) -> Result<(String, SocketAddrV4, Option), Error> { + match item { + Ok(k) => match k { + Ok(k) => match k { + Ok(h) => match h.2 { + Some(k) => { + scy.execute(qu, (&h.0, format!("{:?}", h.1), format!("{:?}", k))) + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + Ok(h) + } + None => Ok(h), + }, + Err(e) => { + error!("bad search {e:?}"); + Err(e) + } + }, + Err(e) => { + error!("Elapsed"); + Err(Error::with_msg_no_trace(format!("{e:?}"))) + } + }, + Err(e) => { + error!("JoinError"); + Err(Error::with_msg_no_trace(format!("{e:?}"))) + } + } +} + +pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { + let opts = parse_config(opts.config).await?; + let scy = scylla::SessionBuilder::new() + .known_node("sf-nube-11:19042") + .default_consistency(Consistency::Quorum) + .use_keyspace("ks1", true) + .build() + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu = scy + .prepare("insert into ioc_by_channel (channel, searchaddr, addr) values (?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + const PORT_DEFAULT: u16 = 5064; info!("Look up {} channel hosts", opts.channels.len()); let mut fut_queue = FuturesUnordered::new(); let mut res2 = vec![]; - let mut chns = VecDeque::from(opts.channels); + let mut chns = VecDeque::new(); + for ch in &opts.channels { + for ac in &opts.search { + chns.push_back((ch.clone(), ac.clone())); + } + } + let max_simul = opts.max_simul; + let timeout = opts.timeout; + let mut ix1 = 0; 'lo2: loop { - const MAX_SIMUL: usize = 23; - while fut_queue.len() < MAX_SIMUL && chns.len() > 0 { - let ch = chns.pop_front().unwrap(); + while fut_queue.len() < max_simul && chns.len() > 0 { + let (ch, ac) = chns.pop_front().unwrap(); let ch2 = ch.clone(); - info!("Start search for {}", ch); - let fut = FindIoc::new(ch.clone()).map_ok(move |x| (ch2, x)); + let ac = match ac.parse::() { + Ok(k) => k, + Err(_) => match ac.parse::() { + Ok(k) => SocketAddrV4::new(k, PORT_DEFAULT), + Err(e) => match tokio::net::lookup_host(&ac).await { + Ok(k) => { + let vs: Vec<_> = k + .filter_map(|x| match x { + SocketAddr::V4(k) => Some(k), + SocketAddr::V6(_) => None, + }) + .collect(); + if let Some(k) = vs.first() { + *k + } else { + error!("Can not understand name for {:?} {:?}", ac, vs); + return Err(e.into()); + } + } + Err(e) => { + error!("{e:?}"); + return Err(e.into()); + } + }, + }, + }; + ix1 += 1; + if ix1 >= 500 { + info!("Start search for {} {}", ch, ac); + ix1 = 0; + } + let fut = FindIoc::new(ch.clone(), Ipv4Addr::UNSPECIFIED, ac.clone(), timeout) + .map_ok(move |x| (ch2, ac.clone(), x)); + let fut = tokio::time::timeout(Duration::from_millis(timeout + 1000), fut); let jh = tokio::spawn(fut); fut_queue.push(jh); if chns.is_empty() { break 'lo2; } } - while fut_queue.len() >= MAX_SIMUL { + while fut_queue.len() >= max_simul { match fut_queue.next().await { Some(item) => { + let item = unwrap_search_result(item, &scy, &qu).await; res2.push(item); } None => break, @@ -62,44 +198,87 @@ pub async fn ca_connect(opts: CaConnectOpts) -> Result<(), Error> { while fut_queue.len() > 0 { match fut_queue.next().await { Some(item) => { + let item = unwrap_search_result(item, &scy, &qu).await; res2.push(item); } None => break, } } info!("Collected {} results", res2.len()); + let mut channels_set = BTreeMap::new(); let mut channels_by_host = BTreeMap::new(); for item in res2 { - // TODO should we continue even if some channel gives an error? - let item = item - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}"))) - .unwrap_or_else(|e| Err(e)); + // TODO should we continue even if some channel gives an error or can not be located? match item { - Ok(item) => { - info!("Found address {} {:?}", item.0, item.1); - let key = item.1; + Ok((ch, ac, Some(addr))) => { + info!("Found address {} {:?} {:?}", ch, ac, addr); + channels_set.insert(ch.clone(), true); + let key = addr; if !channels_by_host.contains_key(&key) { - channels_by_host.insert(key, vec![item.0]); + channels_by_host.insert(key, vec![ch]); } else { - channels_by_host.get_mut(&key).unwrap().push(item.0); + channels_by_host.get_mut(&key).unwrap().push(ch); } } + Ok((_, _, None)) => {} Err(e) => { - error!("Got error: {e:?}"); + error!("Error in res2 list: {e:?}"); } }; } for (host, channels) in &channels_by_host { - info!("Have: {:?} {:?}", host, channels); + info!("Have: {:?} {:?}", host, channels.len()); } - if false { + for ch in &opts.channels { + if !channels_set.contains_key(ch) { + error!("Could not locate {ch:?}"); + } + } + Ok(()) +} + +pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { + let opts = parse_config(opts.config).await?; + let scy = scylla::SessionBuilder::new() + .known_node("sf-nube-11:19042") + .default_consistency(Consistency::Quorum) + .use_keyspace("ks1", true) + .build() + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_find_addr = scy + .prepare("select addr from ioc_by_channel where channel = ?") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let mut channels_by_host = BTreeMap::new(); + for (ix, ch) in opts.channels.iter().enumerate() { + let res = scy + .execute(&qu_find_addr, (ch,)) + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + if res.rows_num().unwrap() == 0 { + error!("can not find address of channel {}", ch); + } else { + let (addr,) = res.first_row_typed::<(String,)>().unwrap(); + let addr: SocketAddrV4 = addr.parse().unwrap(); + if ix % 500 == 0 { + info!("{} {} {:?}", ix, ch, addr); + } + if !channels_by_host.contains_key(&addr) { + channels_by_host.insert(addr, vec![ch.to_string()]); + } else { + channels_by_host.get_mut(&addr).unwrap().push(ch.to_string()); + } + } + } + if opts.abort_after_search == 1 { return Ok(()); } let mut conn_jhs = vec![]; for (host, channels) in channels_by_host { let conn_block = async move { - info!("Create TCP connection to {:?}", (host.addr, host.port)); - let tcp = TcpStream::connect((host.addr, host.port)).await?; + info!("Create TCP connection to {:?}", (host.ip(), host.port())); + let tcp = TcpStream::connect((host.ip().clone(), host.port())).await?; let mut conn = CaConn::new(tcp); for c in channels { conn.channel_add(c); diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index e15d137..59fd68e 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -5,8 +5,9 @@ use futures_util::{Future, FutureExt, Stream, StreamExt}; use libc::c_int; use log::*; use std::collections::BTreeMap; -use std::net::Ipv4Addr; +use std::net::{Ipv4Addr, SocketAddrV4}; use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use tokio::io::unix::AsyncFd; @@ -129,7 +130,7 @@ impl Stream for CaConn { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; self.poll_count += 1; - if self.poll_count > 3000 { + if false && self.poll_count > 3000 { error!("TODO CaConn reached poll_count limit"); return Ready(None); } @@ -343,11 +344,7 @@ impl Drop for SockBox { } } -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub struct Tcp4Addr { - pub addr: Ipv4Addr, - pub port: u16, -} +const SEARCH_ID: AtomicUsize = AtomicUsize::new(0); pub struct FindIoc { state: FindIocState, @@ -358,23 +355,28 @@ pub struct FindIoc { addr: libc::sockaddr_in, addr_len: usize, deadline: Pin>, - result: Option, + result: Option, + addr_bind: Ipv4Addr, + addr_conn: SocketAddrV4, } // Do low-level approach first to make sure it works as specified. impl FindIoc { - pub fn new(channel: String) -> Self { + pub fn new(channel: String, addr_bind: Ipv4Addr, addr_conn: SocketAddrV4, timeout: u64) -> Self { let addr = unsafe { std::mem::transmute_copy(&[0u8; std::mem::size_of::()]) }; + let search_id = SEARCH_ID.fetch_add(1, Ordering::AcqRel) as u32; Self { state: FindIocState::Init, channel, - search_id: 0x12345678, + search_id, sock: SockBox(-1), afd: None, addr: addr, addr_len: 0, - deadline: Box::pin(tokio::time::sleep(Duration::from_millis(200))), + deadline: Box::pin(tokio::time::sleep(Duration::from_millis(timeout))), result: None, + addr_bind, + addr_conn, } } @@ -403,8 +405,7 @@ impl FindIoc { return Err("can not set nonblock".into()); } } - //let ip: [u8; 4] = [172, 26, 120, 71]; - let ip: [u8; 4] = [0, 0, 0, 0]; + let ip: [u8; 4] = self.addr_bind.octets(); let addr = libc::sockaddr_in { sin_family: libc::AF_INET as u16, sin_port: 0, @@ -446,43 +447,34 @@ impl FindIoc { unsafe fn try_write(&mut self) -> Result<(), Error> { let sock = self.sock.0; - let ip: [u8; 4] = [172, 26, 120, 255]; + let ip = self.addr_conn.ip().octets(); let addr = libc::sockaddr_in { sin_family: libc::AF_INET as u16, - sin_port: (5064 as u16).to_be(), + sin_port: (self.addr_conn.port() as u16).to_be(), sin_addr: libc::in_addr { s_addr: u32::from_ne_bytes(ip), }, sin_zero: [0; 8], }; let addr_len = std::mem::size_of::(); + let chb = self.channel.as_bytes(); + let npadded = (chb.len() + 1 + 7) / 8 * 8; + let npad = npadded - self.channel.len(); let mut buf = vec![ // 0u8, 0, 0, 0, // 0, 0, 0, 13, // 0, 0, 0, 0, // 0, 0, 0, 0, // - // - // - 0, 6, 0, 0, // - 0, 0, 0, 13, // - 0, 0, 0, 0, // - 0, 0, 0, 0, - // - // ]; - let chb = self.channel.as_bytes(); - let npadded = (chb.len() + 1 + 7) / 8 * 8; - let npad = npadded - self.channel.len(); + buf.extend_from_slice(&[0, 6]); + buf.extend_from_slice(&(npadded as u16).to_be_bytes()); + buf.extend_from_slice(&[0, 0, 0, 13]); + buf.extend_from_slice(&[0, 0, 0, 0]); + buf.extend_from_slice(&self.search_id.to_be_bytes()); buf.extend_from_slice(chb); buf.extend_from_slice(&vec![0u8; npad]); - let npl = (npadded as u16).to_be_bytes(); - buf[16 + 2] = npl[0]; - buf[16 + 3] = npl[1]; - let a = self.search_id.to_be_bytes(); - for (x, y) in buf[16 + 12..16 + 16].iter_mut().zip(a.into_iter()) { - *x = y; - } + //info!("sendto {ip:?} n {}", buf.len()); let ec = libc::sendto( sock, &buf[0] as *const _ as _, @@ -532,7 +524,12 @@ impl FindIoc { let saddr2: libc::sockaddr_in = std::mem::transmute_copy(&saddr_mem); let src_addr = Ipv4Addr::from(saddr2.sin_addr.s_addr.to_ne_bytes()); let src_port = u16::from_be(saddr2.sin_port); - info!("received from {:?} port {}", src_addr, src_port); + trace!( + "received from src_addr {:?} src_port {} ec {}", + src_addr, + src_port, + ec + ); if false { let mut s1 = String::new(); for i in 0..(ec as usize) { @@ -566,7 +563,7 @@ impl FindIoc { nb.adv(hi.payload())?; msgs.push(msg); } - info!("received {} msgs", msgs.len()); + //info!("received {} msgs {:?}", msgs.len(), msgs); for (msg_ix, msg) in msgs.iter().enumerate() { match &msg.ty { CaMsgTy::SearchRes(k) => { @@ -578,7 +575,7 @@ impl FindIoc { info!("Converted address: {addr:?}"); } info!( - "Received: {}/{} {:?} {:?} {}", + "SearchRes: {}/{} {:?} {:?} {}", msg_ix, msgs.len(), self.channel, @@ -586,17 +583,12 @@ impl FindIoc { k.tcp_port ); if self.result.is_none() { - self.result = Some(Tcp4Addr { - addr: src_addr, - port: k.tcp_port, - }); + self.result = Some(SocketAddrV4::new(src_addr, k.tcp_port)); } else { warn!("Result already populated for {}", self.channel); } } - _ => { - info!("{msg:?}"); - } + _ => {} } } } @@ -605,18 +597,15 @@ impl FindIoc { } impl Future for FindIoc { - type Output = Result; + // TODO use a dedicated type to indicate timeout. + type Output = Result, Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { use Poll::*; loop { match self.deadline.poll_unpin(cx) { Ready(()) => { - break Ready( - self.result - .clone() - .ok_or_else(|| Error::with_msg_no_trace(format!("can not find host for {}", self.channel))), - ); + break Ready(Ok(self.result.clone())); } Pending => {} } diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 5c6928f..463d19d 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -445,7 +445,7 @@ impl CaMsg { break; } } - info!("try to read string from payload len {} ixn {}", payload.len(), ixn); + //info!("try to read string from payload len {} ixn {}", payload.len(), ixn); let v = String::from_utf8_lossy(&payload[..ixn]); info!("String payload: {v}"); } @@ -730,7 +730,7 @@ impl CaProto { break match &self.state { CaState::StdHead => { let hi = HeadInfo::from_netbuf(&mut self.buf)?; - if hi.cmdid == 6 || hi.cmdid > 26 || hi.data_type > 10 || hi.payload_size > 8 { + if hi.cmdid == 6 || hi.cmdid > 26 || hi.data_type > 10 || hi.payload_size > 40 { warn!("StdHead {hi:?}"); } if hi.payload_size == 0xffff && hi.data_count == 0 {