Tolerate bug in java ca implementation

This commit is contained in:
Dominik Werder
2024-09-26 22:04:03 +02:00
parent 188660ae23
commit a2906590be
7 changed files with 146 additions and 16 deletions

View File

@@ -135,6 +135,17 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
series::log_test();
})
}
SubCmd::Ca(subcmd) => {
use daqingest::opts::CaSubcmds::*;
match subcmd.subcmds {
Find(cmd) => {
daqingest::tools::catools::find(cmd, subcmd.broadcast.unwrap_or(String::new()))
.await
.map_err(|e| Error::from_string(e))?;
}
Get(cmd) => todo!(),
}
}
}
Ok(())
}

View File

@@ -33,6 +33,7 @@ pub enum SubCmd {
BsreadDump(BsreadDump),
Version,
LogTest,
Ca(Ca),
}
#[derive(Debug, clap::Parser)]
@@ -167,3 +168,24 @@ pub struct FindOlder {
#[arg(long)]
pub slices: u32,
}
#[derive(Debug, clap::Parser)]
pub struct Ca {
pub broadcast: Option<String>,
#[command(subcommand)]
pub subcmds: CaSubcmds,
}
#[derive(Debug, clap::Parser)]
pub enum CaSubcmds {
Find(CaFind),
Get(CaGet),
}
#[derive(Debug, clap::Parser)]
pub struct CaFind {
pub channel: String,
}
#[derive(Debug, clap::Parser)]
pub struct CaGet {}

View File

@@ -1,3 +1,5 @@
pub mod catools;
use crate::opts::FindOlder;
use crate::opts::RemoveOlder;
use crate::opts::RemoveOlderAll;

View File

@@ -0,0 +1,37 @@
use crate::opts::CaFind;
use err::thiserror;
use err::ThisError;
use futures_util::StreamExt;
use stats::IocFinderStats;
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, ThisError)]
#[cstm(name = "CaTools")]
pub enum Error {}
pub async fn find(cmd: CaFind, broadcast: String) -> Result<(), Error> {
let brd = broadcast.split(",");
let (channels_input_tx, channels_input_rx) = async_channel::bounded(10);
let tgts = brd.map(|x| x.parse().unwrap()).collect();
let blacklist = Vec::new();
let batch_run_max = Duration::from_millis(1200);
let in_flight_max = 1;
let batch_size = 1;
let stats = Arc::new(IocFinderStats::new());
channels_input_tx.send(cmd.channel).await.unwrap();
let mut stream = netfetch::ca::findioc::FindIocStream::new(
channels_input_rx,
tgts,
blacklist,
batch_run_max,
in_flight_max,
batch_size,
stats,
);
while let Some(e) = stream.next().await {
eprintln!("{e:?}");
}
eprintln!("done");
Ok(())
}

View File

@@ -2202,18 +2202,26 @@ impl CaConn {
CaMsgTy::VersionRes(n) => {
// debug!("see incoming {:?} {:?}", self.remote_addr_dbg, msg);
if n < 12 || n > 13 {
error!("See some unexpected version {n} channel search may not work.");
error!("see some unexpected version {n} channel search may not work.");
Ready(Some(Ok(())))
} else {
if n != 13 {
warn!("Received peer version {n}");
warn!("received peer version {n}");
}
self.state = CaConnState::PeerReady;
Ready(Some(Ok(())))
}
}
CaMsgTy::CreateChanRes(k) => {
warn!("got unexpected {k:?}",);
Ready(Some(Ok(())))
}
CaMsgTy::AccessRightsRes(k) => {
warn!("got unexpected {k:?}",);
Ready(Some(Ok(())))
}
k => {
warn!("Got some other unhandled message: {k:?}");
warn!("got some other unhandled message: {k:?}");
Ready(Some(Ok(())))
}
},
@@ -2585,7 +2593,7 @@ impl CaConn {
debug!("VersionRes({x})");
self.weird_count += 1;
if self.weird_count > 200 {
std::process::exit(13);
// std::process::exit(13);
}
}
CaMsgTy::ChannelCloseRes(x) => {
@@ -2824,14 +2832,20 @@ impl CaConn {
Ok(Ready(Some(())))
}
CaConnState::Handshake => {
match {
let res = self.handle_handshake(cx);
res
} {
Ready(Some(Ok(()))) => Ok(Ready(Some(()))),
Ready(Some(Err(e))) => Err(e),
Ready(None) => Ok(Ready(Some(()))),
Pending => Ok(Pending),
if true {
// because of bad java clients which do not send a version, skip the handshake.
self.state = CaConnState::PeerReady;
self.handle_conn_state(tsnow, cx)
} else {
match {
let res = self.handle_handshake(cx);
res
} {
Ready(Some(Ok(()))) => Ok(Ready(Some(()))),
Ready(Some(Err(e))) => Err(e),
Ready(None) => Ok(Ready(Some(()))),
Pending => Ok(Pending),
}
}
}
CaConnState::PeerReady => {

View File

@@ -231,10 +231,10 @@ impl FindIocStream {
error!("getsockname {ec}");
return Err("can not convert raw socket to tokio socket".into());
} else {
if false {
if true {
let ipv4 = Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes());
let tcp_port = u16::from_be(addr.sin_port);
info!("bound local socket to {:?} port {}", ipv4, tcp_port);
debug!("bound local socket to {} port {}", ipv4, tcp_port);
}
}
}
@@ -366,7 +366,7 @@ impl FindIocStream {
let mut good = true;
if let CaMsgTy::VersionRes(v) = msgs[0].ty {
if v != 13 {
warn!("bad version: {msgs:?}");
warn!("bad version in search response: {v}");
good = false;
}
} else {
@@ -375,8 +375,10 @@ impl FindIocStream {
// trace2!("recv {:?} {:?}", src_addr, msgs);
let mut res = Vec::new();
if good {
for msg in &msgs[1..] {
// because of bad java CA implementation, consider also the first message
for msg in &msgs[0..] {
match &msg.ty {
CaMsgTy::VersionRes(_) => {}
CaMsgTy::SearchRes(k) => {
let addr = SocketAddrV4::new(src_addr, k.tcp_port);
res.push((SearchId(k.id), addr));

View File

@@ -210,6 +210,48 @@ impl IntervalEma {
}
}
pub struct CounterU64 {
sum: u64,
}
impl CounterU64 {
pub fn new() -> Self {
Self { sum: 0 }
}
pub fn inc(&mut self) {
self.sum += 1;
}
pub fn add(&mut self, x: u64) {
self.sum += x;
}
}
pub struct DoubleBuffer<T> {
back: T,
front: T,
}
impl<T> DoubleBuffer<T> {
pub fn new() -> Self
where
T: Default,
{
Self {
back: T::default(),
front: T::default(),
}
}
pub fn switch(&mut self)
where
T: Default,
{
self.back = ::core::mem::replace(&mut self.front, T::default());
}
}
pub struct XorShift32 {
state: u32,
}