Investigate channel lookup, seemed firewall

This commit is contained in:
Dominik Werder
2025-01-13 16:20:16 +01:00
parent 2fda157689
commit bd7abaeb16
7 changed files with 52 additions and 28 deletions

View File

@@ -171,6 +171,7 @@ pub struct FindOlder {
#[derive(Debug, clap::Parser)]
pub struct Ca {
#[arg(long)]
pub broadcast: Option<String>,
#[command(subcommand)]
pub subcmds: CaSubcmds,

View File

@@ -11,16 +11,21 @@ use std::time::Duration;
pub enum Error {}
pub async fn find(cmd: CaFind, broadcast: String) -> Result<(), Error> {
eprintln!("{:?}", broadcast);
let brd = broadcast.split(",");
let tgts = brd
.inspect(|x| eprintln!("try to parse: [{:?}]", x))
.map(|x| x.parse().unwrap())
.collect();
eprintln!("{:?}", tgts);
let (channels_input_tx, channels_input_rx) = async_channel::bounded(10);
let tgts = brd.map(|x| x.parse().unwrap()).collect();
let blacklist = Vec::new();
let batch_run_max = Duration::from_millis(1200);
let in_flight_max = 1;
let batch_size = 1;
let stats = Arc::new(IocFinderStats::new());
channels_input_tx.send(cmd.channel).await.unwrap();
let mut stream = netfetch::ca::findioc::FindIocStream::new(
let stream = netfetch::ca::findioc::FindIocStream::new(
channels_input_rx,
tgts,
blacklist,
@@ -29,6 +34,8 @@ pub async fn find(cmd: CaFind, broadcast: String) -> Result<(), Error> {
batch_size,
stats,
);
let deadline = taskrun::tokio::time::sleep(Duration::from_millis(2000));
let mut stream = Box::pin(stream.take_until(deadline));
while let Some(e) = stream.next().await {
eprintln!("{e:?}");
}

View File

@@ -37,6 +37,7 @@ dashmap = "6.0.1"
hashbrown = "0.15.2"
smallvec = "1.13.2"
thiserror = "=0.0.1"
autoerr = "0.0.3"
log = { path = "../log" }
series = { path = "../../daqbuf-series", package = "daqbuf-series" }
serieswriter = { path = "../serieswriter" }

View File

@@ -27,14 +27,15 @@ macro_rules! debug_batch { ($($arg:tt)*) => ( if false { debug!($($arg)*); } ) }
macro_rules! trace_batch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
#[derive(Debug, thiserror::Error)]
#[cstm(name = "Finder")]
pub enum Error {
Join(#[from] tokio::task::JoinError),
DbPg(#[from] dbpg::err::Error),
Postgres(#[from] dbpg::postgres::Error),
IocSearch(#[from] crate::ca::search::Error),
}
autoerr::create_error_v1!(
name(Error, "Finder"),
enum variants {
Join(#[from] tokio::task::JoinError),
DbPg(#[from] dbpg::err::Error),
Postgres(#[from] dbpg::postgres::Error),
IocSearch(#[from] crate::ca::search::Error),
},
);
fn transform_pgres(rows: Vec<PgRow>) -> VecDeque<FindIocRes> {
let mut ret = VecDeque::new();
@@ -147,7 +148,7 @@ async fn finder_worker_single(
let sql = concat!(
"with q1 as (select * from unnest($2::text[]) as unn (ch))",
" select distinct on (tt.facility, tt.channel) tt.channel, tt.addr",
" from ioc_by_channel_log tt join q1 on tt.channel = q1.ch and tt.facility = $1 and tt.addr is not null",
" from ioc_by_channel_log tt join q1 on tt.channel = q1.ch and tt.facility = $1 and tt.archived = 0 and tt.addr is not null",
" order by tt.facility, tt.channel, tsmod desc",
);
let qu_select_multi = pg.prepare(sql).await?;
@@ -155,6 +156,9 @@ async fn finder_worker_single(
loop {
match inp.recv().await {
Ok(batch) => {
if batch.iter().filter(|x| crate::dbg_chn(x.name())).next().is_some() {
info!("SEARCHING FOR DBG");
};
stats.dbsearcher_batch_recv().inc();
stats.dbsearcher_item_recv().add(batch.len() as _);
let ts1 = Instant::now();
@@ -203,6 +207,11 @@ async fn finder_worker_single(
for e in &items {
trace!("found in database: {e:?}");
}
for e in items.iter() {
if crate::dbg_chn(&e.channel) {
info!("FOUND {e:?}");
}
}
let items_len = items.len();
if items_len != nbatch {
stats.dbsearcher_select_error_len_mismatch().inc();

View File

@@ -25,21 +25,22 @@ use std::time::Instant;
use taskrun::tokio;
use tokio::io::unix::AsyncFd;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "FindIoc")]
pub enum Error {
SocketCreate,
SocketConvertTokio,
BroadcastEnable,
NonblockEnable,
SocketBind,
SendFailure,
ReadFailure,
ReadEmpty,
Proto(#[from] proto::Error),
Slidebuf(#[from] slidebuf::Error),
IO(#[from] std::io::Error),
}
autoerr::create_error_v1!(
name(Error, "FindIoc"),
enum variants {
SocketCreate,
SocketConvertTokio,
BroadcastEnable,
NonblockEnable,
SocketBind,
SendFailure,
ReadFailure,
ReadEmpty,
Proto(#[from] proto::Error),
Slidebuf(#[from] slidebuf::Error),
IO(#[from] std::io::Error),
},
);
struct SockBox(c_int);

View File

@@ -108,8 +108,8 @@ pub struct MaybeWrongAddressState {
impl MaybeWrongAddressState {
pub fn new(since: SystemTime, backoff_cnt: u32) -> Self {
let f = 1. + 10. * (backoff_cnt as f32 / 4.).tanh();
let dtms = 4e3_f32 * f;
let f = 2. + 60. * (backoff_cnt as f32 / 5.).tanh();
let dtms = 1e-3 * f;
Self {
since,
backoff_dt: Duration::from_millis(dtms as u64),

View File

@@ -21,3 +21,8 @@ pub fn log_test() {
debug!("log-test");
trace!("log-test");
}
pub fn dbg_chn(chn: &str) -> bool {
let chns = ["SINEG01:QE-B1-OP"];
chns.contains(&chn)
}