diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 51c8e6d..62f406e 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -135,6 +135,17 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { series::log_test(); }) } + SubCmd::Ca(subcmd) => { + use daqingest::opts::CaSubcmds::*; + match subcmd.subcmds { + Find(cmd) => { + daqingest::tools::catools::find(cmd, subcmd.broadcast.unwrap_or(String::new())) + .await + .map_err(|e| Error::from_string(e))?; + } + Get(cmd) => todo!(), + } + } } Ok(()) } diff --git a/daqingest/src/opts.rs b/daqingest/src/opts.rs index f518ae9..6dedaa4 100644 --- a/daqingest/src/opts.rs +++ b/daqingest/src/opts.rs @@ -33,6 +33,7 @@ pub enum SubCmd { BsreadDump(BsreadDump), Version, LogTest, + Ca(Ca), } #[derive(Debug, clap::Parser)] @@ -167,3 +168,24 @@ pub struct FindOlder { #[arg(long)] pub slices: u32, } + +#[derive(Debug, clap::Parser)] +pub struct Ca { + pub broadcast: Option, + #[command(subcommand)] + pub subcmds: CaSubcmds, +} + +#[derive(Debug, clap::Parser)] +pub enum CaSubcmds { + Find(CaFind), + Get(CaGet), +} + +#[derive(Debug, clap::Parser)] +pub struct CaFind { + pub channel: String, +} + +#[derive(Debug, clap::Parser)] +pub struct CaGet {} diff --git a/daqingest/src/tools.rs b/daqingest/src/tools.rs index cdd2cb8..44d80f7 100644 --- a/daqingest/src/tools.rs +++ b/daqingest/src/tools.rs @@ -1,3 +1,5 @@ +pub mod catools; + use crate::opts::FindOlder; use crate::opts::RemoveOlder; use crate::opts::RemoveOlderAll; diff --git a/daqingest/src/tools/catools.rs b/daqingest/src/tools/catools.rs new file mode 100644 index 0000000..136ef2b --- /dev/null +++ b/daqingest/src/tools/catools.rs @@ -0,0 +1,37 @@ +use crate::opts::CaFind; +use err::thiserror; +use err::ThisError; +use futures_util::StreamExt; +use stats::IocFinderStats; +use std::sync::Arc; +use std::time::Duration; + +#[derive(Debug, ThisError)] +#[cstm(name = "CaTools")] +pub enum Error {} + +pub async fn find(cmd: CaFind, broadcast: String) -> Result<(), Error> { + let brd = broadcast.split(","); + let (channels_input_tx, channels_input_rx) = async_channel::bounded(10); + let tgts = brd.map(|x| x.parse().unwrap()).collect(); + let blacklist = Vec::new(); + let batch_run_max = Duration::from_millis(1200); + let in_flight_max = 1; + let batch_size = 1; + let stats = Arc::new(IocFinderStats::new()); + channels_input_tx.send(cmd.channel).await.unwrap(); + let mut stream = netfetch::ca::findioc::FindIocStream::new( + channels_input_rx, + tgts, + blacklist, + batch_run_max, + in_flight_max, + batch_size, + stats, + ); + while let Some(e) = stream.next().await { + eprintln!("{e:?}"); + } + eprintln!("done"); + Ok(()) +} diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index f7f5c70..3c8f99e 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -2202,18 +2202,26 @@ impl CaConn { CaMsgTy::VersionRes(n) => { // debug!("see incoming {:?} {:?}", self.remote_addr_dbg, msg); if n < 12 || n > 13 { - error!("See some unexpected version {n} channel search may not work."); + error!("see some unexpected version {n} channel search may not work."); Ready(Some(Ok(()))) } else { if n != 13 { - warn!("Received peer version {n}"); + warn!("received peer version {n}"); } self.state = CaConnState::PeerReady; Ready(Some(Ok(()))) } } + CaMsgTy::CreateChanRes(k) => { + warn!("got unexpected {k:?}",); + Ready(Some(Ok(()))) + } + CaMsgTy::AccessRightsRes(k) => { + warn!("got unexpected {k:?}",); + Ready(Some(Ok(()))) + } k => { - warn!("Got some other unhandled message: {k:?}"); + warn!("got some other unhandled message: {k:?}"); Ready(Some(Ok(()))) } }, @@ -2585,7 +2593,7 @@ impl CaConn { debug!("VersionRes({x})"); self.weird_count += 1; if self.weird_count > 200 { - std::process::exit(13); + // std::process::exit(13); } } CaMsgTy::ChannelCloseRes(x) => { @@ -2824,14 +2832,20 @@ impl CaConn { Ok(Ready(Some(()))) } CaConnState::Handshake => { - match { - let res = self.handle_handshake(cx); - res - } { - Ready(Some(Ok(()))) => Ok(Ready(Some(()))), - Ready(Some(Err(e))) => Err(e), - Ready(None) => Ok(Ready(Some(()))), - Pending => Ok(Pending), + if true { + // because of bad java clients which do not send a version, skip the handshake. + self.state = CaConnState::PeerReady; + self.handle_conn_state(tsnow, cx) + } else { + match { + let res = self.handle_handshake(cx); + res + } { + Ready(Some(Ok(()))) => Ok(Ready(Some(()))), + Ready(Some(Err(e))) => Err(e), + Ready(None) => Ok(Ready(Some(()))), + Pending => Ok(Pending), + } } } CaConnState::PeerReady => { diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index ee6b6bc..e0b8331 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -231,10 +231,10 @@ impl FindIocStream { error!("getsockname {ec}"); return Err("can not convert raw socket to tokio socket".into()); } else { - if false { + if true { let ipv4 = Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes()); let tcp_port = u16::from_be(addr.sin_port); - info!("bound local socket to {:?} port {}", ipv4, tcp_port); + debug!("bound local socket to {} port {}", ipv4, tcp_port); } } } @@ -366,7 +366,7 @@ impl FindIocStream { let mut good = true; if let CaMsgTy::VersionRes(v) = msgs[0].ty { if v != 13 { - warn!("bad version: {msgs:?}"); + warn!("bad version in search response: {v}"); good = false; } } else { @@ -375,8 +375,10 @@ impl FindIocStream { // trace2!("recv {:?} {:?}", src_addr, msgs); let mut res = Vec::new(); if good { - for msg in &msgs[1..] { + // because of bad java CA implementation, consider also the first message + for msg in &msgs[0..] { match &msg.ty { + CaMsgTy::VersionRes(_) => {} CaMsgTy::SearchRes(k) => { let addr = SocketAddrV4::new(src_addr, k.tcp_port); res.push((SearchId(k.id), addr)); diff --git a/stats/src/stats.rs b/stats/src/stats.rs index c6250e2..89f137e 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -210,6 +210,48 @@ impl IntervalEma { } } +pub struct CounterU64 { + sum: u64, +} + +impl CounterU64 { + pub fn new() -> Self { + Self { sum: 0 } + } + + pub fn inc(&mut self) { + self.sum += 1; + } + + pub fn add(&mut self, x: u64) { + self.sum += x; + } +} + +pub struct DoubleBuffer { + back: T, + front: T, +} + +impl DoubleBuffer { + pub fn new() -> Self + where + T: Default, + { + Self { + back: T::default(), + front: T::default(), + } + } + + pub fn switch(&mut self) + where + T: Default, + { + self.back = ::core::mem::replace(&mut self.front, T::default()); + } +} + pub struct XorShift32 { state: u32, }