Improve search and fetch of ioc addr

This commit is contained in:
Dominik Werder
2022-08-29 18:11:27 +02:00
parent e2b337f848
commit a5ce717702
6 changed files with 221 additions and 159 deletions

View File

@@ -852,12 +852,12 @@ impl CaConn {
},
Err(e) => {
error!("got error item from CaProto {e:?}");
Ready(Some(Ok(())))
Ready(Some(Err(e)))
}
},
Ready(None) => {
warn!("CaProto is done {:?}", self.remote_addr_dbg);
self.state = CaConnState::Wait(wait_fut(10000));
warn!("handle_conn_listen CaProto is done {:?}", self.remote_addr_dbg);
self.state = CaConnState::Wait(wait_fut(self.backoff_next()));
self.proto = None;
Ready(None)
}
@@ -1014,8 +1014,8 @@ impl CaConn {
Ready(Some(Err(e)))
}
Ready(None) => {
warn!("CaProto is done");
self.state = CaConnState::Wait(wait_fut(10000));
warn!("handle_peer_ready CaProto is done {:?}", self.remote_addr_dbg);
self.state = CaConnState::Wait(wait_fut(self.backoff_next()));
self.proto = None;
Ready(None)
}
@@ -1028,6 +1028,8 @@ impl CaConn {
}
res
}
//fn loop_inner(&mut self, cx: &mut Context)
}
impl Stream for CaConn {
@@ -1160,6 +1162,7 @@ impl Stream for CaConn {
},
CaConnState::PeerReady => {
{
// TODO can I move this block somewhere else?
let _ = self.handle_get_series_futs(cx)?;
let ts2 = Instant::now();
self.stats
@@ -1180,10 +1183,7 @@ impl Stream for CaConn {
}
}
Ready(Some(Err(e))) => Ready(Some(Err(e))),
Ready(None) => {
// TODO even though protocol is done, we might still have e.g. insert items to flush!
Ready(None)
}
Ready(None) => continue 'outer,
Pending => Pending,
}
}

View File

@@ -44,8 +44,9 @@ struct SearchBatch {
#[derive(Debug)]
pub struct FindIocRes {
pub src: SocketAddrV4,
pub channel: String,
pub query_addr: Option<SocketAddrV4>,
pub response_addr: Option<SocketAddrV4>,
pub addr: Option<SocketAddrV4>,
}
@@ -90,7 +91,7 @@ impl FindIocStream {
bids_timed_out: BTreeMap::new(),
sids_done: BTreeMap::new(),
result_for_done_sid_count: 0,
in_flight_max: 40,
in_flight_max: 20,
channels_per_batch: 10,
batch_run_max: Duration::from_millis(2500),
}
@@ -240,12 +241,6 @@ impl FindIocStream {
let saddr2: libc::sockaddr_in = std::mem::transmute_copy(&saddr_mem);
let src_addr = Ipv4Addr::from(saddr2.sin_addr.s_addr.to_ne_bytes());
let src_port = u16::from_be(saddr2.sin_port);
trace!(
"received from src_addr {:?} src_port {} ec {}",
src_addr,
src_port,
ec
);
if false {
let mut s1 = String::new();
for i in 0..(ec as usize) {
@@ -257,10 +252,15 @@ impl FindIocStream {
String::from_utf8_lossy(buf[..ec as usize].into())
);
}
// TODO handle if we get a too large answer.
if ec > 2048 {
// TODO handle if we get a too large answer.
error!("received packet too large");
panic!();
}
let mut nb = crate::netbuf::NetBuf::new(2048);
nb.put_slice(&buf[..ec as usize])?;
let mut msgs = vec![];
let mut accounted = 0;
loop {
let n = nb.data().len();
if n == 0 {
@@ -271,6 +271,11 @@ impl FindIocStream {
break;
}
let hi = HeadInfo::from_netbuf(&mut nb)?;
if hi.cmdid() == 0 && hi.payload() == 0 {
} else if hi.cmdid() == 6 && hi.payload() == 8 {
} else {
info!("cmdid {} payload {}", hi.cmdid(), hi.payload());
}
if nb.data().len() < hi.payload() {
error!("incomplete message, missing payload");
break;
@@ -278,6 +283,13 @@ impl FindIocStream {
let msg = CaMsg::from_proto_infos(&hi, nb.data(), 32)?;
nb.adv(hi.payload())?;
msgs.push(msg);
accounted += 16 + hi.payload();
}
if accounted != ec as usize {
info!("unaccounted data ec {} accounted {}", ec, accounted);
}
if msgs.len() != 2 {
info!("expect always 2 commands in the response, instead got {}", msgs.len());
}
let mut res = vec![];
for msg in msgs.iter() {
@@ -286,7 +298,10 @@ impl FindIocStream {
let addr = SocketAddrV4::new(src_addr, k.tcp_port);
res.push((SearchId(k.id), addr));
}
_ => {}
CaMsgTy::VersionRes(13) => {}
_ => {
warn!("try_read: unknown message received {:?}", msg.ty);
}
}
}
Poll::Ready(Ok((SocketAddrV4::new(src_addr, src_port), res)))
@@ -344,14 +359,17 @@ impl FindIocStream {
sids_remove.push(sid.clone());
match self.in_flight.get_mut(bid) {
Some(batch) => {
// TGT
for (i2, s2) in batch.sids.iter().enumerate() {
if s2 == &sid {
match batch.channels.get(i2) {
Some(ch) => {
let res = FindIocRes {
channel: ch.into(),
// TODO associate a batch with a specific query address.
query_addr: None,
response_addr: Some(src.clone()),
addr: Some(addr),
src: src.clone(),
};
self.out_queue.push_back(res);
}
@@ -420,7 +438,8 @@ impl FindIocStream {
}
for (sid, ch) in sids.into_iter().zip(chns) {
let res = FindIocRes {
src: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0),
query_addr: None,
response_addr: None,
channel: ch,
addr: None,
};

View File

@@ -723,6 +723,10 @@ impl HeadInfo {
Ok(hi)
}
pub fn cmdid(&self) -> u16 {
self.cmdid
}
pub fn payload(&self) -> usize {
self.payload_size as _
}

View File

@@ -74,24 +74,16 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
// TODO join pg_conn in the end:
tokio::spawn(pg_conn);
let pg_client = Arc::new(pg_client);
let qu_select = pg_client
.prepare("select addr from ioc_by_channel where facility = $1 and channel = $2 and searchaddr = $3")
.await
.unwrap();
let qu_insert = {
const TEXT: tokio_postgres::types::Type = tokio_postgres::types::Type::TEXT;
pg_client
.prepare_typed(
"insert into ioc_by_channel (facility, channel, searchaddr, addr) values ($1, $2, $3, $4) on conflict do nothing",
&[TEXT, TEXT, TEXT, TEXT],
"insert into ioc_by_channel_log (facility, channel, queryaddr, responseaddr, addr) values ($1, $2, $3, $4, $5)",
&[TEXT, TEXT, TEXT, TEXT, TEXT],
)
.await
.unwrap()
};
let qu_update = pg_client
.prepare("update ioc_by_channel set addr = $4, tsmod = now(), modcount = modcount + 1 where facility = $1 and channel = $2 and searchaddr = $3")
.await
.unwrap();
let mut addrs = vec![];
for s in &opts.search {
match resolve_address(s).await {
@@ -105,19 +97,8 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
}
}
let gw_addrs = {
// Try to blacklist..
// TODO if it helps, add a config option for it.
let gateways = [
"sf-cagw",
"saresa-cagw",
"saresb-cagw",
"saresc-cagw",
"satesd-cagw",
"satese-cagw",
"satesf-cagw",
];
let mut gw_addrs = vec![];
for s in gateways {
for s in &opts.search_blacklist {
match resolve_address(s).await {
Ok(addr) => {
info!("resolved {s} as {addr}");
@@ -163,9 +144,11 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
for item in item {
let mut do_block = false;
for a2 in &gw_addrs {
if &item.src == a2 {
do_block = true;
warn!("gateways responded to search");
if let Some(response_addr) = &item.response_addr {
if response_addr == a2 {
do_block = true;
warn!("gateways responded to search");
}
}
}
if let Some(a1) = item.addr.as_ref() {
@@ -177,31 +160,41 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
}
}
if do_block {
info!("blocking {item:?}");
info!("blacklisting {item:?}");
} else {
info!("using {item:?}");
/*
let srcaddr = item.src.to_string();
let addr = item.addr.map(|x| x.to_string()).unwrap_or(String::new());
let rows = pg_client
.query(&qu_select, &[&facility, &item.channel, &srcaddr])
.await
.unwrap();
if rows.is_empty() {
info!("insert {item:?}");
if true || rows.is_empty() {
//info!("insert {item:?}");
pg_client
.execute(&qu_insert, &[&facility, &item.channel, &srcaddr, &addr])
.await
.unwrap();
} else {
info!("update {item:?}");
//info!("update {item:?}");
let addr2: &str = rows[0].get(0);
if addr2 != addr {
pg_client
.execute(&qu_update, &[&facility, &item.channel, &srcaddr, &addr])
.await
.unwrap();
}
if addr2 != addr {}
pg_client
.execute(&qu_update, &[&facility, &item.channel, &srcaddr, &addr])
.await
.unwrap();
}
*/
let queryaddr = item.query_addr.map(|x| x.to_string());
let responseaddr = item.response_addr.map(|x| x.to_string());
let addr = item.addr.map(|x| x.to_string());
pg_client
.execute(
&qu_insert,
&[&facility, &item.channel, &queryaddr, &responseaddr, &addr],
)
.await
.unwrap();
}
}
}