Block gateways in search
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
pub mod conn;
|
||||
pub mod findioc;
|
||||
pub mod proto;
|
||||
pub mod search;
|
||||
pub mod store;
|
||||
@@ -155,6 +156,27 @@ async fn spawn_scylla_insert_workers(
|
||||
let mut i1 = 0;
|
||||
while let Ok(item) = recv.recv().await {
|
||||
match item {
|
||||
QueryItem::ConnectionStatus(item) => {
|
||||
match crate::store::insert_connection_status(item, &data_store, &stats).await {
|
||||
Ok(_) => {
|
||||
stats.store_worker_item_insert_inc();
|
||||
}
|
||||
Err(e) => {
|
||||
stats.store_worker_item_error_inc();
|
||||
// TODO introduce more structured error variants.
|
||||
if e.msg().contains("WriteTimeout") {
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
} else {
|
||||
// TODO back off but continue.
|
||||
error!("insert worker sees error: {e:?}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
QueryItem::ChannelStatus(_item) => {
|
||||
// TODO
|
||||
}
|
||||
QueryItem::Insert(item) => {
|
||||
stats.store_worker_item_recv_inc();
|
||||
let insert_frac = insert_frac.load(Ordering::Acquire);
|
||||
@@ -424,7 +446,6 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
opts.api_bind.clone(),
|
||||
insert_frac.clone(),
|
||||
insert_ivl_min.clone(),
|
||||
command_queue_set.clone(),
|
||||
ingest_commons.clone(),
|
||||
));
|
||||
|
||||
|
||||
@@ -1,15 +1,16 @@
|
||||
use super::proto::{self, CaItem, CaMsg, CaMsgTy, CaProto};
|
||||
use super::store::DataStore;
|
||||
use crate::bsread::ChannelDescDecoded;
|
||||
use crate::ca::proto::{CreateChan, EventAdd, HeadInfo};
|
||||
use crate::ca::proto::{CreateChan, EventAdd};
|
||||
use crate::ca::store::ChannelRegistry;
|
||||
use crate::series::{Existence, SeriesId};
|
||||
use crate::store::{CommonInsertItemQueueSender, InsertItem, IvlItem, MuteItem, QueryItem};
|
||||
use crate::store::{
|
||||
CommonInsertItemQueueSender, ConnectionStatus, ConnectionStatusItem, InsertItem, IvlItem, MuteItem, QueryItem,
|
||||
};
|
||||
use async_channel::Sender;
|
||||
use err::Error;
|
||||
use futures_util::stream::FuturesOrdered;
|
||||
use futures_util::{Future, FutureExt, Stream, StreamExt, TryFutureExt};
|
||||
use libc::c_int;
|
||||
use log::*;
|
||||
use netpod::timeunits::*;
|
||||
use netpod::{ScalarType, Shape};
|
||||
@@ -18,16 +19,25 @@ use stats::{CaConnStats, IntervalEma};
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
use std::net::{Ipv4Addr, SocketAddrV4};
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use tokio::io::unix::AsyncFd;
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub enum ChannelConnectedInfo {
|
||||
Disconnected,
|
||||
Connecting,
|
||||
Connected,
|
||||
Error,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct ChannelStateInfo {
|
||||
pub name: String,
|
||||
pub addr: SocketAddrV4,
|
||||
pub channel_connected_info: ChannelConnectedInfo,
|
||||
pub scalar_type: Option<ScalarType>,
|
||||
pub shape: Option<Shape>,
|
||||
// NOTE: this solution can yield to the same Instant serialize to different string representations.
|
||||
@@ -123,7 +133,13 @@ enum ChannelState {
|
||||
}
|
||||
|
||||
impl ChannelState {
|
||||
fn to_info(&self, name: String) -> ChannelStateInfo {
|
||||
fn to_info(&self, name: String, addr: SocketAddrV4) -> ChannelStateInfo {
|
||||
let channel_connected_info = match self {
|
||||
ChannelState::Init => ChannelConnectedInfo::Disconnected,
|
||||
ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting,
|
||||
ChannelState::Created(_) => ChannelConnectedInfo::Connected,
|
||||
ChannelState::Error(_) => ChannelConnectedInfo::Error,
|
||||
};
|
||||
let scalar_type = match self {
|
||||
ChannelState::Created(s) => Some(s.scalar_type.clone()),
|
||||
_ => None,
|
||||
@@ -157,6 +173,8 @@ impl ChannelState {
|
||||
let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10);
|
||||
ChannelStateInfo {
|
||||
name,
|
||||
addr,
|
||||
channel_connected_info,
|
||||
scalar_type,
|
||||
shape,
|
||||
ts_created,
|
||||
@@ -169,7 +187,10 @@ impl ChannelState {
|
||||
|
||||
enum CaConnState {
|
||||
Unconnected,
|
||||
Connecting(Pin<Box<dyn Future<Output = Result<TcpStream, Error>> + Send>>),
|
||||
Connecting(
|
||||
SocketAddrV4,
|
||||
Pin<Box<dyn Future<Output = Result<Result<TcpStream, std::io::Error>, tokio::time::error::Elapsed>> + Send>>,
|
||||
),
|
||||
Init,
|
||||
Listen,
|
||||
PeerReady,
|
||||
@@ -289,6 +310,8 @@ pub struct CaConn {
|
||||
insert_ivl_min: Arc<AtomicU64>,
|
||||
conn_command_tx: async_channel::Sender<ConnCommand>,
|
||||
conn_command_rx: async_channel::Receiver<ConnCommand>,
|
||||
conn_backoff: f32,
|
||||
conn_backoff_beg: f32,
|
||||
}
|
||||
|
||||
impl CaConn {
|
||||
@@ -327,6 +350,8 @@ impl CaConn {
|
||||
insert_ivl_min,
|
||||
conn_command_tx: cq_tx,
|
||||
conn_command_rx: cq_rx,
|
||||
conn_backoff: 0.02,
|
||||
conn_backoff_beg: 0.02,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -360,7 +385,7 @@ impl CaConn {
|
||||
//info!("State for {name:?}");
|
||||
let res = match self.cid_by_name.get(&name) {
|
||||
Some(cid) => match self.channels.get(cid) {
|
||||
Some(state) => Some(state.to_info(name)),
|
||||
Some(state) => Some(state.to_info(name, self.remote_addr_dbg.clone())),
|
||||
None => None,
|
||||
},
|
||||
None => None,
|
||||
@@ -385,7 +410,7 @@ impl CaConn {
|
||||
.name_by_cid
|
||||
.get(cid)
|
||||
.map_or("--unknown--".into(), |x| x.to_string());
|
||||
state.to_info(name)
|
||||
state.to_info(name, self.remote_addr_dbg.clone())
|
||||
})
|
||||
.collect();
|
||||
let msg = (self.remote_addr_dbg.clone(), res);
|
||||
@@ -462,6 +487,16 @@ impl CaConn {
|
||||
self.name_by_cid.get(&cid).map(|x| x.as_str())
|
||||
}
|
||||
|
||||
fn backoff_next(&mut self) -> u64 {
|
||||
let dt = (self.conn_backoff * 300. * 1e3) as u64;
|
||||
self.conn_backoff = (self.conn_backoff * 2.).tanh();
|
||||
dt
|
||||
}
|
||||
|
||||
fn backoff_reset(&mut self) {
|
||||
self.conn_backoff = self.conn_backoff_beg;
|
||||
}
|
||||
|
||||
fn handle_insert_futs(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
|
||||
use Poll::*;
|
||||
loop {
|
||||
@@ -1025,41 +1060,70 @@ impl Stream for CaConn {
|
||||
break match &mut self.state {
|
||||
CaConnState::Unconnected => {
|
||||
let addr = self.remote_addr_dbg.clone();
|
||||
trace!("create tcp connection to {:?}", (addr.ip(), addr.port()));
|
||||
let fut = async move {
|
||||
trace!("create tcp connection to {:?}", (addr.ip(), addr.port()));
|
||||
match tokio::time::timeout(Duration::from_millis(500), TcpStream::connect(addr)).await {
|
||||
Ok(Ok(k)) => Ok(k),
|
||||
Ok(Err(e)) => {
|
||||
// TODO keep this in channel status field, or log when we have exponential backoff
|
||||
trace!("Can not connect to {addr:?} {e:?}");
|
||||
Err(e.into())
|
||||
}
|
||||
Err(_) => {
|
||||
// TODO keep this in channel status field, or log when we have exponential backoff
|
||||
trace!("Can not connect to {addr:?} timeout");
|
||||
Err(Error::with_msg_no_trace(format!("timeout")))
|
||||
}
|
||||
}
|
||||
tokio::time::timeout(Duration::from_millis(500), TcpStream::connect(addr)).await
|
||||
};
|
||||
self.state = CaConnState::Connecting(Box::pin(fut));
|
||||
self.state = CaConnState::Connecting(addr, Box::pin(fut));
|
||||
continue 'outer;
|
||||
}
|
||||
CaConnState::Connecting(ref mut fut) => {
|
||||
CaConnState::Connecting(ref addr, ref mut fut) => {
|
||||
match fut.poll_unpin(cx) {
|
||||
Ready(Ok(tcp)) => {
|
||||
let proto = CaProto::new(tcp, self.remote_addr_dbg.clone(), self.array_truncate);
|
||||
self.state = CaConnState::Init;
|
||||
self.proto = Some(proto);
|
||||
continue 'outer;
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
// TODO keep this in channel status field, or log when we have exponential backoff
|
||||
trace!("Connection error: {e:?}");
|
||||
// We can not connect to the remote.
|
||||
// TODO do exponential backoff.
|
||||
self.state = CaConnState::Wait(wait_fut(10000));
|
||||
self.proto = None;
|
||||
continue 'outer;
|
||||
Ready(connect_result) => {
|
||||
match connect_result {
|
||||
Ok(Ok(tcp)) => {
|
||||
let addr = addr.clone();
|
||||
self.insert_item_queue.push_back(QueryItem::ConnectionStatus(
|
||||
ConnectionStatusItem {
|
||||
ts: SystemTime::now(),
|
||||
addr,
|
||||
status: ConnectionStatus::Established,
|
||||
},
|
||||
));
|
||||
self.backoff_reset();
|
||||
let proto =
|
||||
CaProto::new(tcp, self.remote_addr_dbg.clone(), self.array_truncate);
|
||||
self.state = CaConnState::Init;
|
||||
self.proto = Some(proto);
|
||||
continue 'outer;
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
// TODO log with exponential backoff
|
||||
// 172.26.24.118:2072
|
||||
const ADDR2: Ipv4Addr = Ipv4Addr::new(172, 26, 24, 118);
|
||||
if addr.ip() == &ADDR2 && addr.port() == 2072 {
|
||||
warn!("error during connect to {addr:?} {e:?}");
|
||||
}
|
||||
let addr = addr.clone();
|
||||
self.insert_item_queue.push_back(QueryItem::ConnectionStatus(
|
||||
ConnectionStatusItem {
|
||||
ts: SystemTime::now(),
|
||||
addr,
|
||||
status: ConnectionStatus::ConnectError,
|
||||
},
|
||||
));
|
||||
let dt = self.backoff_next();
|
||||
self.state = CaConnState::Wait(wait_fut(dt));
|
||||
self.proto = None;
|
||||
continue 'outer;
|
||||
}
|
||||
Err(e) => {
|
||||
// TODO log with exponential backoff
|
||||
trace!("timeout during connect to {addr:?} {e:?}");
|
||||
let addr = addr.clone();
|
||||
self.insert_item_queue.push_back(QueryItem::ConnectionStatus(
|
||||
ConnectionStatusItem {
|
||||
ts: SystemTime::now(),
|
||||
addr,
|
||||
status: ConnectionStatus::ConnectTimeout,
|
||||
},
|
||||
));
|
||||
let dt = self.backoff_next();
|
||||
self.state = CaConnState::Wait(wait_fut(dt));
|
||||
self.proto = None;
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
@@ -1144,550 +1208,3 @@ impl Stream for CaConn {
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
struct SockBox(c_int);
|
||||
|
||||
impl Drop for SockBox {
|
||||
fn drop(self: &mut Self) {
|
||||
if self.0 != -1 {
|
||||
unsafe {
|
||||
libc::close(self.0);
|
||||
self.0 = -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO should be able to get away with non-atomic counters.
|
||||
static BATCH_ID: AtomicUsize = AtomicUsize::new(0);
|
||||
static SEARCH_ID2: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
|
||||
struct BatchId(u32);
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
|
||||
struct SearchId(u32);
|
||||
|
||||
struct SearchBatch {
|
||||
ts_beg: Instant,
|
||||
tgts: VecDeque<usize>,
|
||||
channels: Vec<String>,
|
||||
sids: Vec<SearchId>,
|
||||
done: Vec<SearchId>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FindIocRes {
|
||||
pub src: SocketAddrV4,
|
||||
pub channel: String,
|
||||
pub addr: Option<SocketAddrV4>,
|
||||
}
|
||||
|
||||
pub struct FindIocStream {
|
||||
tgts: Vec<SocketAddrV4>,
|
||||
channels_input: VecDeque<String>,
|
||||
in_flight: BTreeMap<BatchId, SearchBatch>,
|
||||
in_flight_max: usize,
|
||||
bid_by_sid: BTreeMap<SearchId, BatchId>,
|
||||
batch_send_queue: VecDeque<BatchId>,
|
||||
sock: SockBox,
|
||||
afd: AsyncFd<i32>,
|
||||
buf1: Vec<u8>,
|
||||
send_addr: SocketAddrV4,
|
||||
out_queue: VecDeque<FindIocRes>,
|
||||
ping: Pin<Box<tokio::time::Sleep>>,
|
||||
channels_per_batch: usize,
|
||||
batch_run_max: Duration,
|
||||
bids_all_done: BTreeMap<BatchId, ()>,
|
||||
bids_timed_out: BTreeMap<BatchId, ()>,
|
||||
sids_done: BTreeMap<SearchId, ()>,
|
||||
result_for_done_sid_count: u64,
|
||||
}
|
||||
|
||||
impl FindIocStream {
|
||||
pub fn new(tgts: Vec<SocketAddrV4>) -> Self {
|
||||
let sock = unsafe { Self::create_socket() }.unwrap();
|
||||
let afd = AsyncFd::new(sock.0).unwrap();
|
||||
Self {
|
||||
tgts,
|
||||
channels_input: VecDeque::new(),
|
||||
in_flight: BTreeMap::new(),
|
||||
bid_by_sid: BTreeMap::new(),
|
||||
batch_send_queue: VecDeque::new(),
|
||||
sock,
|
||||
afd,
|
||||
buf1: vec![0; 1024],
|
||||
send_addr: SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 5064),
|
||||
out_queue: VecDeque::new(),
|
||||
ping: Box::pin(tokio::time::sleep(Duration::from_millis(200))),
|
||||
bids_all_done: BTreeMap::new(),
|
||||
bids_timed_out: BTreeMap::new(),
|
||||
sids_done: BTreeMap::new(),
|
||||
result_for_done_sid_count: 0,
|
||||
in_flight_max: 10,
|
||||
channels_per_batch: 10,
|
||||
batch_run_max: Duration::from_millis(1500),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn quick_state(&self) -> String {
|
||||
format!(
|
||||
"channels_input {} in_flight {} bid_by_sid {} out_queue {} result_for_done_sid_count {} bids_timed_out {}",
|
||||
self.channels_input.len(),
|
||||
self.in_flight.len(),
|
||||
self.bid_by_sid.len(),
|
||||
self.out_queue.len(),
|
||||
self.result_for_done_sid_count,
|
||||
self.bids_timed_out.len()
|
||||
)
|
||||
}
|
||||
|
||||
pub fn push(&mut self, x: String) {
|
||||
self.channels_input.push_back(x);
|
||||
}
|
||||
|
||||
fn buf_and_batch(&mut self, bid: &BatchId) -> Option<(&mut Vec<u8>, &mut SearchBatch)> {
|
||||
match self.in_flight.get_mut(bid) {
|
||||
Some(batch) => Some((&mut self.buf1, batch)),
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
unsafe fn create_socket() -> Result<SockBox, Error> {
|
||||
let ec = libc::socket(libc::AF_INET, libc::SOCK_DGRAM, 0);
|
||||
if ec == -1 {
|
||||
return Err("can not create socket".into());
|
||||
}
|
||||
let sock = SockBox(ec);
|
||||
{
|
||||
let opt: libc::c_int = 1;
|
||||
let ec = libc::setsockopt(
|
||||
sock.0,
|
||||
libc::SOL_SOCKET,
|
||||
libc::SO_BROADCAST,
|
||||
&opt as *const _ as _,
|
||||
std::mem::size_of::<libc::c_int>() as _,
|
||||
);
|
||||
if ec == -1 {
|
||||
return Err("can not enable broadcast".into());
|
||||
}
|
||||
}
|
||||
{
|
||||
let ec = libc::fcntl(sock.0, libc::F_SETFL, libc::O_NONBLOCK);
|
||||
if ec == -1 {
|
||||
return Err("can not set nonblock".into());
|
||||
}
|
||||
}
|
||||
let ip: [u8; 4] = [0, 0, 0, 0];
|
||||
let addr = libc::sockaddr_in {
|
||||
sin_family: libc::AF_INET as u16,
|
||||
sin_port: 0,
|
||||
sin_addr: libc::in_addr {
|
||||
s_addr: u32::from_ne_bytes(ip),
|
||||
},
|
||||
sin_zero: [0; 8],
|
||||
};
|
||||
let addr_len = std::mem::size_of::<libc::sockaddr_in>();
|
||||
let ec = libc::bind(sock.0, &addr as *const _ as _, addr_len as _);
|
||||
if ec == -1 {
|
||||
return Err("can not bind socket".into());
|
||||
}
|
||||
{
|
||||
let mut addr = libc::sockaddr_in {
|
||||
sin_family: libc::AF_INET as u16,
|
||||
sin_port: 0,
|
||||
sin_addr: libc::in_addr { s_addr: 0 },
|
||||
sin_zero: [0; 8],
|
||||
};
|
||||
let mut addr_len = std::mem::size_of::<libc::sockaddr_in>();
|
||||
let ec = libc::getsockname(sock.0, &mut addr as *mut _ as _, &mut addr_len as *mut _ as _);
|
||||
if ec == -1 {
|
||||
error!("getsockname {ec}");
|
||||
return Err("can not convert raw socket to tokio socket".into());
|
||||
} else {
|
||||
if false {
|
||||
let ipv4 = Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes());
|
||||
let tcp_port = u16::from_be(addr.sin_port);
|
||||
info!("bound local socket to {:?} port {}", ipv4, tcp_port);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(sock)
|
||||
}
|
||||
|
||||
unsafe fn try_send(sock: i32, addr: &SocketAddrV4, buf: &[u8]) -> Poll<Result<(), Error>> {
|
||||
let ip = addr.ip().octets();
|
||||
let port = addr.port();
|
||||
let addr = libc::sockaddr_in {
|
||||
sin_family: libc::AF_INET as u16,
|
||||
sin_port: port.to_be(),
|
||||
sin_addr: libc::in_addr {
|
||||
s_addr: u32::from_ne_bytes(ip),
|
||||
},
|
||||
sin_zero: [0; 8],
|
||||
};
|
||||
let addr_len = std::mem::size_of::<libc::sockaddr_in>();
|
||||
let ec = libc::sendto(
|
||||
sock,
|
||||
&buf[0] as *const _ as _,
|
||||
buf.len() as _,
|
||||
0,
|
||||
&addr as *const _ as _,
|
||||
addr_len as _,
|
||||
);
|
||||
if ec == -1 {
|
||||
let errno = *libc::__errno_location();
|
||||
if errno == libc::EAGAIN {
|
||||
return Poll::Pending;
|
||||
} else {
|
||||
return Poll::Ready(Err("FindIocStream can not send".into()));
|
||||
}
|
||||
}
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
unsafe fn try_read(sock: i32) -> Poll<Result<(SocketAddrV4, Vec<(SearchId, SocketAddrV4)>), Error>> {
|
||||
let mut saddr_mem = [0u8; std::mem::size_of::<libc::sockaddr>()];
|
||||
let mut saddr_len: libc::socklen_t = saddr_mem.len() as _;
|
||||
let mut buf = vec![0u8; 1024];
|
||||
let ec = libc::recvfrom(
|
||||
sock,
|
||||
buf.as_mut_ptr() as _,
|
||||
buf.len() as _,
|
||||
libc::O_NONBLOCK,
|
||||
&mut saddr_mem as *mut _ as _,
|
||||
&mut saddr_len as *mut _ as _,
|
||||
);
|
||||
if ec == -1 {
|
||||
let errno = *libc::__errno_location();
|
||||
if errno == libc::EAGAIN {
|
||||
return Poll::Pending;
|
||||
} else {
|
||||
return Poll::Ready(Err("FindIocStream can not read".into()));
|
||||
}
|
||||
} else if ec < 0 {
|
||||
error!("unexpected received {ec}");
|
||||
Poll::Ready(Err(Error::with_msg_no_trace(format!("try_read ec {ec}"))))
|
||||
} else if ec == 0 {
|
||||
Poll::Ready(Err(Error::with_msg_no_trace(format!("try_read ec {ec}"))))
|
||||
} else {
|
||||
let saddr2: libc::sockaddr_in = std::mem::transmute_copy(&saddr_mem);
|
||||
let src_addr = Ipv4Addr::from(saddr2.sin_addr.s_addr.to_ne_bytes());
|
||||
let src_port = u16::from_be(saddr2.sin_port);
|
||||
trace!(
|
||||
"received from src_addr {:?} src_port {} ec {}",
|
||||
src_addr,
|
||||
src_port,
|
||||
ec
|
||||
);
|
||||
if false {
|
||||
let mut s1 = String::new();
|
||||
for i in 0..(ec as usize) {
|
||||
s1.extend(format!(" {:02x}", buf[i]).chars());
|
||||
}
|
||||
debug!("received answer {s1}");
|
||||
debug!(
|
||||
"received answer string {}",
|
||||
String::from_utf8_lossy(buf[..ec as usize].into())
|
||||
);
|
||||
}
|
||||
// TODO handle if we get a too large answer.
|
||||
let mut nb = crate::netbuf::NetBuf::new(2048);
|
||||
nb.put_slice(&buf[..ec as usize])?;
|
||||
let mut msgs = vec![];
|
||||
loop {
|
||||
let n = nb.data().len();
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
if n < 16 {
|
||||
error!("incomplete message, not enough for header");
|
||||
break;
|
||||
}
|
||||
let hi = HeadInfo::from_netbuf(&mut nb)?;
|
||||
if nb.data().len() < hi.payload() {
|
||||
error!("incomplete message, missing payload");
|
||||
break;
|
||||
}
|
||||
let msg = CaMsg::from_proto_infos(&hi, nb.data(), 32)?;
|
||||
nb.adv(hi.payload())?;
|
||||
msgs.push(msg);
|
||||
}
|
||||
let mut res = vec![];
|
||||
for msg in msgs.iter() {
|
||||
match &msg.ty {
|
||||
CaMsgTy::SearchRes(k) => {
|
||||
let addr = SocketAddrV4::new(src_addr, k.tcp_port);
|
||||
res.push((SearchId(k.id), addr));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
Poll::Ready(Ok((SocketAddrV4::new(src_addr, src_port), res)))
|
||||
}
|
||||
}
|
||||
|
||||
fn serialize_batch(buf: &mut Vec<u8>, batch: &SearchBatch) {
|
||||
buf.extend_from_slice(&[0, 0, 0, 0]);
|
||||
buf.extend_from_slice(&[0, 0, 0, 13]);
|
||||
buf.extend_from_slice(&[0, 0, 0, 0]);
|
||||
buf.extend_from_slice(&[0, 0, 0, 0]);
|
||||
for (sid, ch) in batch.sids.iter().zip(batch.channels.iter()) {
|
||||
let chb = ch.as_bytes();
|
||||
let npadded = (chb.len() + 1 + 7) / 8 * 8;
|
||||
let npad = npadded - chb.len();
|
||||
buf.extend_from_slice(&[0, 6]);
|
||||
buf.extend_from_slice(&(npadded as u16).to_be_bytes());
|
||||
buf.extend_from_slice(&[0, 0, 0, 13]);
|
||||
buf.extend_from_slice(&[0, 0, 0, 0]);
|
||||
buf.extend_from_slice(&sid.0.to_be_bytes());
|
||||
buf.extend_from_slice(chb);
|
||||
buf.extend_from_slice(&vec![0u8; npad]);
|
||||
}
|
||||
}
|
||||
|
||||
fn create_in_flight(&mut self) {
|
||||
let bid = BATCH_ID.fetch_add(1, Ordering::AcqRel);
|
||||
let bid = BatchId(bid as u32);
|
||||
let mut sids = vec![];
|
||||
let mut chs = vec![];
|
||||
while chs.len() < self.channels_per_batch && self.channels_input.len() > 0 {
|
||||
let sid = SEARCH_ID2.fetch_add(1, Ordering::AcqRel);
|
||||
let sid = SearchId(sid as u32);
|
||||
self.bid_by_sid.insert(sid.clone(), bid.clone());
|
||||
sids.push(sid);
|
||||
chs.push(self.channels_input.pop_front().unwrap());
|
||||
}
|
||||
let batch = SearchBatch {
|
||||
ts_beg: Instant::now(),
|
||||
channels: chs,
|
||||
tgts: self.tgts.iter().enumerate().map(|x| x.0).collect(),
|
||||
sids,
|
||||
done: vec![],
|
||||
};
|
||||
self.in_flight.insert(bid.clone(), batch);
|
||||
self.batch_send_queue.push_back(bid);
|
||||
}
|
||||
|
||||
fn handle_result(&mut self, src: SocketAddrV4, res: Vec<(SearchId, SocketAddrV4)>) {
|
||||
let mut sids_remove = vec![];
|
||||
for (sid, addr) in res {
|
||||
self.sids_done.insert(sid.clone(), ());
|
||||
match self.bid_by_sid.get(&sid) {
|
||||
Some(bid) => {
|
||||
sids_remove.push(sid.clone());
|
||||
match self.in_flight.get_mut(bid) {
|
||||
Some(batch) => {
|
||||
for (i2, s2) in batch.sids.iter().enumerate() {
|
||||
if s2 == &sid {
|
||||
match batch.channels.get(i2) {
|
||||
Some(ch) => {
|
||||
let res = FindIocRes {
|
||||
channel: ch.into(),
|
||||
addr: Some(addr),
|
||||
src: src.clone(),
|
||||
};
|
||||
self.out_queue.push_back(res);
|
||||
}
|
||||
None => {
|
||||
error!("no matching channel for {sid:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Book keeping:
|
||||
batch.done.push(sid);
|
||||
let mut all_done = true;
|
||||
if batch.done.len() >= batch.sids.len() {
|
||||
for s1 in &batch.sids {
|
||||
if !batch.done.contains(s1) {
|
||||
all_done = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
all_done = false;
|
||||
}
|
||||
if all_done {
|
||||
self.bids_all_done.insert(bid.clone(), ());
|
||||
self.in_flight.remove(bid);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// TODO analyze reasons
|
||||
error!("no batch for {bid:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// TODO analyze reasons
|
||||
if self.sids_done.contains_key(&sid) {
|
||||
self.result_for_done_sid_count += 1;
|
||||
} else {
|
||||
error!("no bid for {sid:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for sid in sids_remove {
|
||||
self.bid_by_sid.remove(&sid);
|
||||
}
|
||||
}
|
||||
|
||||
fn clear_timed_out(&mut self) {
|
||||
let now = Instant::now();
|
||||
let mut bids = vec![];
|
||||
let mut sids = vec![];
|
||||
let mut chns = vec![];
|
||||
for (bid, batch) in &mut self.in_flight {
|
||||
if now.duration_since(batch.ts_beg) > self.batch_run_max {
|
||||
self.bids_timed_out.insert(bid.clone(), ());
|
||||
for (i2, sid) in batch.sids.iter().enumerate() {
|
||||
if batch.done.contains(sid) == false {
|
||||
debug!("Timeout: {bid:?} {}", batch.channels[i2]);
|
||||
}
|
||||
sids.push(sid.clone());
|
||||
chns.push(batch.channels[i2].clone());
|
||||
}
|
||||
bids.push(bid.clone());
|
||||
}
|
||||
}
|
||||
for (sid, ch) in sids.into_iter().zip(chns) {
|
||||
let res = FindIocRes {
|
||||
src: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0),
|
||||
channel: ch,
|
||||
addr: None,
|
||||
};
|
||||
self.out_queue.push_back(res);
|
||||
self.bid_by_sid.remove(&sid);
|
||||
}
|
||||
for bid in bids {
|
||||
self.in_flight.remove(&bid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for FindIocStream {
|
||||
type Item = Result<VecDeque<FindIocRes>, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
match self.ping.poll_unpin(cx) {
|
||||
Ready(_) => {
|
||||
self.ping = Box::pin(tokio::time::sleep(Duration::from_millis(200)));
|
||||
cx.waker().wake_by_ref();
|
||||
}
|
||||
Pending => {}
|
||||
}
|
||||
self.clear_timed_out();
|
||||
loop {
|
||||
let mut loop_again = false;
|
||||
if self.out_queue.is_empty() == false {
|
||||
let ret = std::mem::replace(&mut self.out_queue, VecDeque::new());
|
||||
break Ready(Some(Ok(ret)));
|
||||
}
|
||||
if !self.buf1.is_empty() {
|
||||
match self.afd.poll_write_ready(cx) {
|
||||
Ready(Ok(mut g)) => match unsafe { Self::try_send(self.sock.0, &self.send_addr, &self.buf1) } {
|
||||
Ready(Ok(())) => {
|
||||
self.buf1.clear();
|
||||
loop_again = true;
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
error!("{e:?}");
|
||||
}
|
||||
Pending => {
|
||||
g.clear_ready();
|
||||
warn!("socket seemed ready for write, but is not");
|
||||
loop_again = true;
|
||||
}
|
||||
},
|
||||
Ready(Err(e)) => {
|
||||
let e = Error::with_msg_no_trace(format!("{e:?}"));
|
||||
error!("poll_write_ready {e:?}");
|
||||
}
|
||||
Pending => {}
|
||||
}
|
||||
}
|
||||
while self.buf1.is_empty() {
|
||||
match self.batch_send_queue.pop_front() {
|
||||
Some(bid) => {
|
||||
match self.buf_and_batch(&bid) {
|
||||
Some((buf1, batch)) => {
|
||||
match batch.tgts.pop_front() {
|
||||
Some(tgtix) => {
|
||||
Self::serialize_batch(buf1, batch);
|
||||
match self.tgts.get(tgtix) {
|
||||
Some(tgt) => {
|
||||
let tgt = tgt.clone();
|
||||
//info!("Serialize and queue {bid:?}");
|
||||
self.send_addr = tgt.clone();
|
||||
self.batch_send_queue.push_back(bid);
|
||||
loop_again = true;
|
||||
}
|
||||
None => {
|
||||
self.buf1.clear();
|
||||
self.batch_send_queue.push_back(bid);
|
||||
loop_again = true;
|
||||
error!("tgtix does not exist");
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
//info!("Batch exhausted");
|
||||
loop_again = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
if self.bids_all_done.contains_key(&bid) {
|
||||
// TODO count events
|
||||
} else {
|
||||
info!("Batch {bid:?} seems already done");
|
||||
}
|
||||
loop_again = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
while !self.channels_input.is_empty() && self.in_flight.len() < self.in_flight_max {
|
||||
self.create_in_flight();
|
||||
loop_again = true;
|
||||
}
|
||||
break match self.afd.poll_read_ready(cx) {
|
||||
Ready(Ok(mut g)) => match unsafe { Self::try_read(self.sock.0) } {
|
||||
Ready(Ok((src, res))) => {
|
||||
self.handle_result(src, res);
|
||||
continue;
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
error!("Error from try_read {e:?}");
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
Pending => {
|
||||
g.clear_ready();
|
||||
continue;
|
||||
}
|
||||
},
|
||||
Ready(Err(e)) => {
|
||||
let e = Error::with_msg_no_trace(format!("{e:?}"));
|
||||
error!("poll_read_ready {e:?}");
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
Pending => {
|
||||
if loop_again {
|
||||
continue;
|
||||
} else {
|
||||
if self.channels_input.is_empty() && self.in_flight.is_empty() && self.out_queue.is_empty() {
|
||||
Ready(None)
|
||||
} else {
|
||||
Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
559
netfetch/src/ca/findioc.rs
Normal file
559
netfetch/src/ca/findioc.rs
Normal file
@@ -0,0 +1,559 @@
|
||||
use crate::ca::proto::{CaMsg, CaMsgTy, HeadInfo};
|
||||
use err::Error;
|
||||
use futures_util::{FutureExt, Stream};
|
||||
use libc::c_int;
|
||||
use log::*;
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
use std::net::{Ipv4Addr, SocketAddrV4};
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::io::unix::AsyncFd;
|
||||
|
||||
struct SockBox(c_int);
|
||||
|
||||
impl Drop for SockBox {
|
||||
fn drop(self: &mut Self) {
|
||||
if self.0 != -1 {
|
||||
unsafe {
|
||||
libc::close(self.0);
|
||||
self.0 = -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO should be able to get away with non-atomic counters.
|
||||
static BATCH_ID: AtomicUsize = AtomicUsize::new(0);
|
||||
static SEARCH_ID2: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
|
||||
struct BatchId(u32);
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
|
||||
struct SearchId(u32);
|
||||
|
||||
struct SearchBatch {
|
||||
ts_beg: Instant,
|
||||
tgts: VecDeque<usize>,
|
||||
channels: Vec<String>,
|
||||
sids: Vec<SearchId>,
|
||||
done: Vec<SearchId>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FindIocRes {
|
||||
pub src: SocketAddrV4,
|
||||
pub channel: String,
|
||||
pub addr: Option<SocketAddrV4>,
|
||||
}
|
||||
|
||||
pub struct FindIocStream {
|
||||
tgts: Vec<SocketAddrV4>,
|
||||
channels_input: VecDeque<String>,
|
||||
in_flight: BTreeMap<BatchId, SearchBatch>,
|
||||
in_flight_max: usize,
|
||||
bid_by_sid: BTreeMap<SearchId, BatchId>,
|
||||
batch_send_queue: VecDeque<BatchId>,
|
||||
sock: SockBox,
|
||||
afd: AsyncFd<i32>,
|
||||
buf1: Vec<u8>,
|
||||
send_addr: SocketAddrV4,
|
||||
out_queue: VecDeque<FindIocRes>,
|
||||
ping: Pin<Box<tokio::time::Sleep>>,
|
||||
channels_per_batch: usize,
|
||||
batch_run_max: Duration,
|
||||
bids_all_done: BTreeMap<BatchId, ()>,
|
||||
bids_timed_out: BTreeMap<BatchId, ()>,
|
||||
sids_done: BTreeMap<SearchId, ()>,
|
||||
result_for_done_sid_count: u64,
|
||||
}
|
||||
|
||||
impl FindIocStream {
|
||||
pub fn new(tgts: Vec<SocketAddrV4>) -> Self {
|
||||
let sock = unsafe { Self::create_socket() }.unwrap();
|
||||
let afd = AsyncFd::new(sock.0).unwrap();
|
||||
Self {
|
||||
tgts,
|
||||
channels_input: VecDeque::new(),
|
||||
in_flight: BTreeMap::new(),
|
||||
bid_by_sid: BTreeMap::new(),
|
||||
batch_send_queue: VecDeque::new(),
|
||||
sock,
|
||||
afd,
|
||||
buf1: vec![0; 1024],
|
||||
send_addr: SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 5064),
|
||||
out_queue: VecDeque::new(),
|
||||
ping: Box::pin(tokio::time::sleep(Duration::from_millis(200))),
|
||||
bids_all_done: BTreeMap::new(),
|
||||
bids_timed_out: BTreeMap::new(),
|
||||
sids_done: BTreeMap::new(),
|
||||
result_for_done_sid_count: 0,
|
||||
in_flight_max: 40,
|
||||
channels_per_batch: 10,
|
||||
batch_run_max: Duration::from_millis(2500),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn quick_state(&self) -> String {
|
||||
format!(
|
||||
"channels_input {} in_flight {} bid_by_sid {} out_queue {} result_for_done_sid_count {} bids_timed_out {}",
|
||||
self.channels_input.len(),
|
||||
self.in_flight.len(),
|
||||
self.bid_by_sid.len(),
|
||||
self.out_queue.len(),
|
||||
self.result_for_done_sid_count,
|
||||
self.bids_timed_out.len()
|
||||
)
|
||||
}
|
||||
|
||||
pub fn push(&mut self, x: String) {
|
||||
self.channels_input.push_back(x);
|
||||
}
|
||||
|
||||
fn buf_and_batch(&mut self, bid: &BatchId) -> Option<(&mut Vec<u8>, &mut SearchBatch)> {
|
||||
match self.in_flight.get_mut(bid) {
|
||||
Some(batch) => Some((&mut self.buf1, batch)),
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
unsafe fn create_socket() -> Result<SockBox, Error> {
|
||||
let ec = libc::socket(libc::AF_INET, libc::SOCK_DGRAM, 0);
|
||||
if ec == -1 {
|
||||
return Err("can not create socket".into());
|
||||
}
|
||||
let sock = SockBox(ec);
|
||||
{
|
||||
let opt: libc::c_int = 1;
|
||||
let ec = libc::setsockopt(
|
||||
sock.0,
|
||||
libc::SOL_SOCKET,
|
||||
libc::SO_BROADCAST,
|
||||
&opt as *const _ as _,
|
||||
std::mem::size_of::<libc::c_int>() as _,
|
||||
);
|
||||
if ec == -1 {
|
||||
return Err("can not enable broadcast".into());
|
||||
}
|
||||
}
|
||||
{
|
||||
let ec = libc::fcntl(sock.0, libc::F_SETFL, libc::O_NONBLOCK);
|
||||
if ec == -1 {
|
||||
return Err("can not set nonblock".into());
|
||||
}
|
||||
}
|
||||
let ip: [u8; 4] = [0, 0, 0, 0];
|
||||
let addr = libc::sockaddr_in {
|
||||
sin_family: libc::AF_INET as u16,
|
||||
sin_port: 0,
|
||||
sin_addr: libc::in_addr {
|
||||
s_addr: u32::from_ne_bytes(ip),
|
||||
},
|
||||
sin_zero: [0; 8],
|
||||
};
|
||||
let addr_len = std::mem::size_of::<libc::sockaddr_in>();
|
||||
let ec = libc::bind(sock.0, &addr as *const _ as _, addr_len as _);
|
||||
if ec == -1 {
|
||||
return Err("can not bind socket".into());
|
||||
}
|
||||
{
|
||||
let mut addr = libc::sockaddr_in {
|
||||
sin_family: libc::AF_INET as u16,
|
||||
sin_port: 0,
|
||||
sin_addr: libc::in_addr { s_addr: 0 },
|
||||
sin_zero: [0; 8],
|
||||
};
|
||||
let mut addr_len = std::mem::size_of::<libc::sockaddr_in>();
|
||||
let ec = libc::getsockname(sock.0, &mut addr as *mut _ as _, &mut addr_len as *mut _ as _);
|
||||
if ec == -1 {
|
||||
error!("getsockname {ec}");
|
||||
return Err("can not convert raw socket to tokio socket".into());
|
||||
} else {
|
||||
if false {
|
||||
let ipv4 = Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes());
|
||||
let tcp_port = u16::from_be(addr.sin_port);
|
||||
info!("bound local socket to {:?} port {}", ipv4, tcp_port);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(sock)
|
||||
}
|
||||
|
||||
unsafe fn try_send(sock: i32, addr: &SocketAddrV4, buf: &[u8]) -> Poll<Result<(), Error>> {
|
||||
let ip = addr.ip().octets();
|
||||
let port = addr.port();
|
||||
let addr = libc::sockaddr_in {
|
||||
sin_family: libc::AF_INET as u16,
|
||||
sin_port: port.to_be(),
|
||||
sin_addr: libc::in_addr {
|
||||
s_addr: u32::from_ne_bytes(ip),
|
||||
},
|
||||
sin_zero: [0; 8],
|
||||
};
|
||||
let addr_len = std::mem::size_of::<libc::sockaddr_in>();
|
||||
let ec = libc::sendto(
|
||||
sock,
|
||||
&buf[0] as *const _ as _,
|
||||
buf.len() as _,
|
||||
0,
|
||||
&addr as *const _ as _,
|
||||
addr_len as _,
|
||||
);
|
||||
if ec == -1 {
|
||||
let errno = *libc::__errno_location();
|
||||
if errno == libc::EAGAIN {
|
||||
return Poll::Pending;
|
||||
} else {
|
||||
return Poll::Ready(Err("FindIocStream can not send".into()));
|
||||
}
|
||||
}
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
unsafe fn try_read(sock: i32) -> Poll<Result<(SocketAddrV4, Vec<(SearchId, SocketAddrV4)>), Error>> {
|
||||
let mut saddr_mem = [0u8; std::mem::size_of::<libc::sockaddr>()];
|
||||
let mut saddr_len: libc::socklen_t = saddr_mem.len() as _;
|
||||
let mut buf = vec![0u8; 1024];
|
||||
let ec = libc::recvfrom(
|
||||
sock,
|
||||
buf.as_mut_ptr() as _,
|
||||
buf.len() as _,
|
||||
libc::O_NONBLOCK,
|
||||
&mut saddr_mem as *mut _ as _,
|
||||
&mut saddr_len as *mut _ as _,
|
||||
);
|
||||
if ec == -1 {
|
||||
let errno = *libc::__errno_location();
|
||||
if errno == libc::EAGAIN {
|
||||
return Poll::Pending;
|
||||
} else {
|
||||
return Poll::Ready(Err("FindIocStream can not read".into()));
|
||||
}
|
||||
} else if ec < 0 {
|
||||
error!("unexpected received {ec}");
|
||||
Poll::Ready(Err(Error::with_msg_no_trace(format!("try_read ec {ec}"))))
|
||||
} else if ec == 0 {
|
||||
Poll::Ready(Err(Error::with_msg_no_trace(format!("try_read ec {ec}"))))
|
||||
} else {
|
||||
let saddr2: libc::sockaddr_in = std::mem::transmute_copy(&saddr_mem);
|
||||
let src_addr = Ipv4Addr::from(saddr2.sin_addr.s_addr.to_ne_bytes());
|
||||
let src_port = u16::from_be(saddr2.sin_port);
|
||||
trace!(
|
||||
"received from src_addr {:?} src_port {} ec {}",
|
||||
src_addr,
|
||||
src_port,
|
||||
ec
|
||||
);
|
||||
if false {
|
||||
let mut s1 = String::new();
|
||||
for i in 0..(ec as usize) {
|
||||
s1.extend(format!(" {:02x}", buf[i]).chars());
|
||||
}
|
||||
debug!("received answer {s1}");
|
||||
debug!(
|
||||
"received answer string {}",
|
||||
String::from_utf8_lossy(buf[..ec as usize].into())
|
||||
);
|
||||
}
|
||||
// TODO handle if we get a too large answer.
|
||||
let mut nb = crate::netbuf::NetBuf::new(2048);
|
||||
nb.put_slice(&buf[..ec as usize])?;
|
||||
let mut msgs = vec![];
|
||||
loop {
|
||||
let n = nb.data().len();
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
if n < 16 {
|
||||
error!("incomplete message, not enough for header");
|
||||
break;
|
||||
}
|
||||
let hi = HeadInfo::from_netbuf(&mut nb)?;
|
||||
if nb.data().len() < hi.payload() {
|
||||
error!("incomplete message, missing payload");
|
||||
break;
|
||||
}
|
||||
let msg = CaMsg::from_proto_infos(&hi, nb.data(), 32)?;
|
||||
nb.adv(hi.payload())?;
|
||||
msgs.push(msg);
|
||||
}
|
||||
let mut res = vec![];
|
||||
for msg in msgs.iter() {
|
||||
match &msg.ty {
|
||||
CaMsgTy::SearchRes(k) => {
|
||||
let addr = SocketAddrV4::new(src_addr, k.tcp_port);
|
||||
res.push((SearchId(k.id), addr));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
Poll::Ready(Ok((SocketAddrV4::new(src_addr, src_port), res)))
|
||||
}
|
||||
}
|
||||
|
||||
fn serialize_batch(buf: &mut Vec<u8>, batch: &SearchBatch) {
|
||||
buf.extend_from_slice(&[0, 0, 0, 0]);
|
||||
buf.extend_from_slice(&[0, 0, 0, 13]);
|
||||
buf.extend_from_slice(&[0, 0, 0, 0]);
|
||||
buf.extend_from_slice(&[0, 0, 0, 0]);
|
||||
for (sid, ch) in batch.sids.iter().zip(batch.channels.iter()) {
|
||||
let chb = ch.as_bytes();
|
||||
let npadded = (chb.len() + 1 + 7) / 8 * 8;
|
||||
let npad = npadded - chb.len();
|
||||
buf.extend_from_slice(&[0, 6]);
|
||||
buf.extend_from_slice(&(npadded as u16).to_be_bytes());
|
||||
buf.extend_from_slice(&[0, 0, 0, 13]);
|
||||
buf.extend_from_slice(&[0, 0, 0, 0]);
|
||||
buf.extend_from_slice(&sid.0.to_be_bytes());
|
||||
buf.extend_from_slice(chb);
|
||||
buf.extend_from_slice(&vec![0u8; npad]);
|
||||
}
|
||||
}
|
||||
|
||||
fn create_in_flight(&mut self) {
|
||||
let bid = BATCH_ID.fetch_add(1, Ordering::AcqRel);
|
||||
let bid = BatchId(bid as u32);
|
||||
let mut sids = vec![];
|
||||
let mut chs = vec![];
|
||||
while chs.len() < self.channels_per_batch && self.channels_input.len() > 0 {
|
||||
let sid = SEARCH_ID2.fetch_add(1, Ordering::AcqRel);
|
||||
let sid = SearchId(sid as u32);
|
||||
self.bid_by_sid.insert(sid.clone(), bid.clone());
|
||||
sids.push(sid);
|
||||
chs.push(self.channels_input.pop_front().unwrap());
|
||||
}
|
||||
let batch = SearchBatch {
|
||||
ts_beg: Instant::now(),
|
||||
channels: chs,
|
||||
tgts: self.tgts.iter().enumerate().map(|x| x.0).collect(),
|
||||
sids,
|
||||
done: vec![],
|
||||
};
|
||||
self.in_flight.insert(bid.clone(), batch);
|
||||
self.batch_send_queue.push_back(bid);
|
||||
}
|
||||
|
||||
fn handle_result(&mut self, src: SocketAddrV4, res: Vec<(SearchId, SocketAddrV4)>) {
|
||||
let mut sids_remove = vec![];
|
||||
for (sid, addr) in res {
|
||||
self.sids_done.insert(sid.clone(), ());
|
||||
match self.bid_by_sid.get(&sid) {
|
||||
Some(bid) => {
|
||||
sids_remove.push(sid.clone());
|
||||
match self.in_flight.get_mut(bid) {
|
||||
Some(batch) => {
|
||||
for (i2, s2) in batch.sids.iter().enumerate() {
|
||||
if s2 == &sid {
|
||||
match batch.channels.get(i2) {
|
||||
Some(ch) => {
|
||||
let res = FindIocRes {
|
||||
channel: ch.into(),
|
||||
addr: Some(addr),
|
||||
src: src.clone(),
|
||||
};
|
||||
self.out_queue.push_back(res);
|
||||
}
|
||||
None => {
|
||||
error!("no matching channel for {sid:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Book keeping:
|
||||
batch.done.push(sid);
|
||||
let mut all_done = true;
|
||||
if batch.done.len() >= batch.sids.len() {
|
||||
for s1 in &batch.sids {
|
||||
if !batch.done.contains(s1) {
|
||||
all_done = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
all_done = false;
|
||||
}
|
||||
if all_done {
|
||||
self.bids_all_done.insert(bid.clone(), ());
|
||||
self.in_flight.remove(bid);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// TODO analyze reasons
|
||||
error!("no batch for {bid:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// TODO analyze reasons
|
||||
if self.sids_done.contains_key(&sid) {
|
||||
self.result_for_done_sid_count += 1;
|
||||
} else {
|
||||
error!("no bid for {sid:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for sid in sids_remove {
|
||||
self.bid_by_sid.remove(&sid);
|
||||
}
|
||||
}
|
||||
|
||||
fn clear_timed_out(&mut self) {
|
||||
let now = Instant::now();
|
||||
let mut bids = vec![];
|
||||
let mut sids = vec![];
|
||||
let mut chns = vec![];
|
||||
for (bid, batch) in &mut self.in_flight {
|
||||
if now.duration_since(batch.ts_beg) > self.batch_run_max {
|
||||
self.bids_timed_out.insert(bid.clone(), ());
|
||||
for (i2, sid) in batch.sids.iter().enumerate() {
|
||||
if batch.done.contains(sid) == false {
|
||||
debug!("Timeout: {bid:?} {}", batch.channels[i2]);
|
||||
}
|
||||
sids.push(sid.clone());
|
||||
chns.push(batch.channels[i2].clone());
|
||||
}
|
||||
bids.push(bid.clone());
|
||||
}
|
||||
}
|
||||
for (sid, ch) in sids.into_iter().zip(chns) {
|
||||
let res = FindIocRes {
|
||||
src: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0),
|
||||
channel: ch,
|
||||
addr: None,
|
||||
};
|
||||
self.out_queue.push_back(res);
|
||||
self.bid_by_sid.remove(&sid);
|
||||
}
|
||||
for bid in bids {
|
||||
self.in_flight.remove(&bid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for FindIocStream {
|
||||
type Item = Result<VecDeque<FindIocRes>, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
match self.ping.poll_unpin(cx) {
|
||||
Ready(_) => {
|
||||
self.ping = Box::pin(tokio::time::sleep(Duration::from_millis(200)));
|
||||
cx.waker().wake_by_ref();
|
||||
}
|
||||
Pending => {}
|
||||
}
|
||||
self.clear_timed_out();
|
||||
loop {
|
||||
let mut loop_again = false;
|
||||
if self.out_queue.is_empty() == false {
|
||||
let ret = std::mem::replace(&mut self.out_queue, VecDeque::new());
|
||||
break Ready(Some(Ok(ret)));
|
||||
}
|
||||
if !self.buf1.is_empty() {
|
||||
match self.afd.poll_write_ready(cx) {
|
||||
Ready(Ok(mut g)) => match unsafe { Self::try_send(self.sock.0, &self.send_addr, &self.buf1) } {
|
||||
Ready(Ok(())) => {
|
||||
self.buf1.clear();
|
||||
loop_again = true;
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
error!("{e:?}");
|
||||
}
|
||||
Pending => {
|
||||
g.clear_ready();
|
||||
warn!("socket seemed ready for write, but is not");
|
||||
loop_again = true;
|
||||
}
|
||||
},
|
||||
Ready(Err(e)) => {
|
||||
let e = Error::with_msg_no_trace(format!("{e:?}"));
|
||||
error!("poll_write_ready {e:?}");
|
||||
}
|
||||
Pending => {}
|
||||
}
|
||||
}
|
||||
while self.buf1.is_empty() {
|
||||
match self.batch_send_queue.pop_front() {
|
||||
Some(bid) => {
|
||||
match self.buf_and_batch(&bid) {
|
||||
Some((buf1, batch)) => {
|
||||
match batch.tgts.pop_front() {
|
||||
Some(tgtix) => {
|
||||
Self::serialize_batch(buf1, batch);
|
||||
match self.tgts.get(tgtix) {
|
||||
Some(tgt) => {
|
||||
let tgt = tgt.clone();
|
||||
//info!("Serialize and queue {bid:?}");
|
||||
self.send_addr = tgt.clone();
|
||||
self.batch_send_queue.push_back(bid);
|
||||
loop_again = true;
|
||||
}
|
||||
None => {
|
||||
self.buf1.clear();
|
||||
self.batch_send_queue.push_back(bid);
|
||||
loop_again = true;
|
||||
error!("tgtix does not exist");
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
//info!("Batch exhausted");
|
||||
loop_again = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
if self.bids_all_done.contains_key(&bid) {
|
||||
// TODO count events
|
||||
} else {
|
||||
info!("Batch {bid:?} seems already done");
|
||||
}
|
||||
loop_again = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
while !self.channels_input.is_empty() && self.in_flight.len() < self.in_flight_max {
|
||||
self.create_in_flight();
|
||||
loop_again = true;
|
||||
}
|
||||
break match self.afd.poll_read_ready(cx) {
|
||||
Ready(Ok(mut g)) => match unsafe { Self::try_read(self.sock.0) } {
|
||||
Ready(Ok((src, res))) => {
|
||||
self.handle_result(src, res);
|
||||
continue;
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
error!("Error from try_read {e:?}");
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
Pending => {
|
||||
g.clear_ready();
|
||||
continue;
|
||||
}
|
||||
},
|
||||
Ready(Err(e)) => {
|
||||
let e = Error::with_msg_no_trace(format!("{e:?}"));
|
||||
error!("poll_read_ready {e:?}");
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
Pending => {
|
||||
if loop_again {
|
||||
continue;
|
||||
} else {
|
||||
if self.channels_input.is_empty() && self.in_flight.is_empty() && self.out_queue.is_empty() {
|
||||
Ready(None)
|
||||
} else {
|
||||
Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
use crate::ca::findioc::FindIocStream;
|
||||
use crate::ca::{parse_config, ListenFromFileOpts};
|
||||
use err::Error;
|
||||
use futures_util::StreamExt;
|
||||
@@ -7,35 +8,49 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use super::conn::FindIocStream;
|
||||
|
||||
async fn resolve_address(addr_str: &str) -> Result<SocketAddrV4, Error> {
|
||||
const PORT_DEFAULT: u16 = 5064;
|
||||
let ac = match addr_str.parse::<SocketAddrV4>() {
|
||||
Ok(k) => k,
|
||||
Err(_) => match addr_str.parse::<Ipv4Addr>() {
|
||||
Ok(k) => SocketAddrV4::new(k, PORT_DEFAULT),
|
||||
Err(e) => match tokio::net::lookup_host(&addr_str).await {
|
||||
Ok(k) => {
|
||||
let vs: Vec<_> = k
|
||||
.filter_map(|x| match x {
|
||||
SocketAddr::V4(k) => Some(k),
|
||||
SocketAddr::V6(_) => None,
|
||||
})
|
||||
.collect();
|
||||
if let Some(k) = vs.first() {
|
||||
*k
|
||||
Err(_) => {
|
||||
trace!("can not parse {addr_str} as SocketAddrV4");
|
||||
match addr_str.parse::<Ipv4Addr>() {
|
||||
Ok(k) => SocketAddrV4::new(k, PORT_DEFAULT),
|
||||
Err(e) => {
|
||||
trace!("can not parse {addr_str} as Ipv4Addr");
|
||||
let (hostname, port) = if addr_str.contains(":") {
|
||||
let mut it = addr_str.split(":");
|
||||
(
|
||||
it.next().unwrap().to_string(),
|
||||
it.next().unwrap().parse::<u16>().unwrap(),
|
||||
)
|
||||
} else {
|
||||
error!("Can not understand name for {:?} {:?}", addr_str, vs);
|
||||
return Err(e.into());
|
||||
(addr_str.to_string(), PORT_DEFAULT)
|
||||
};
|
||||
match tokio::net::lookup_host(format!("{}:33", hostname.clone())).await {
|
||||
Ok(k) => {
|
||||
let vs: Vec<_> = k
|
||||
.filter_map(|x| match x {
|
||||
SocketAddr::V4(k) => Some(k),
|
||||
SocketAddr::V6(_) => {
|
||||
error!("TODO ipv6 support");
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
if let Some(k) = vs.first() {
|
||||
let mut k = *k;
|
||||
k.set_port(port);
|
||||
k
|
||||
} else {
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("{e:?}");
|
||||
return Err(e.into());
|
||||
}
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
};
|
||||
Ok(ac)
|
||||
}
|
||||
@@ -79,9 +94,43 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
.unwrap();
|
||||
let mut addrs = vec![];
|
||||
for s in &opts.search {
|
||||
let x = resolve_address(s).await?;
|
||||
addrs.push(x);
|
||||
match resolve_address(s).await {
|
||||
Ok(addr) => {
|
||||
info!("resolved {s} as {addr}");
|
||||
addrs.push(addr);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("can not resolve {s} {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
let gw_addrs = {
|
||||
// Try to blacklist..
|
||||
// TODO if it helps, add a config option for it.
|
||||
let gateways = [
|
||||
"sf-cagw",
|
||||
"saresa-cagw",
|
||||
"saresb-cagw",
|
||||
"saresc-cagw",
|
||||
"satesd-cagw",
|
||||
"satese-cagw",
|
||||
"satesf-cagw",
|
||||
];
|
||||
let mut gw_addrs = vec![];
|
||||
for s in gateways {
|
||||
match resolve_address(s).await {
|
||||
Ok(addr) => {
|
||||
info!("resolved {s} as {addr}");
|
||||
gw_addrs.push(addr);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("can not resolve {s} {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
gw_addrs
|
||||
};
|
||||
info!("Blacklisting {} gateways", gw_addrs.len());
|
||||
let mut finder = FindIocStream::new(addrs);
|
||||
for ch in &opts.channels {
|
||||
finder.push(ch.into());
|
||||
@@ -93,7 +142,7 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
ts_last = ts_now;
|
||||
info!("{}", finder.quick_state());
|
||||
}
|
||||
let k = tokio::time::timeout(Duration::from_millis(200), finder.next()).await;
|
||||
let k = tokio::time::timeout(Duration::from_millis(1500), finder.next()).await;
|
||||
let item = match k {
|
||||
Ok(Some(k)) => k,
|
||||
Ok(None) => {
|
||||
@@ -112,24 +161,46 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
}
|
||||
};
|
||||
for item in item {
|
||||
let searchaddr = item.src.to_string();
|
||||
let addr = item.addr.map(|x| x.to_string()).unwrap_or(String::new());
|
||||
let rows = pg_client
|
||||
.query(&qu_select, &[&facility, &item.channel, &searchaddr])
|
||||
.await
|
||||
.unwrap();
|
||||
if rows.is_empty() {
|
||||
pg_client
|
||||
.execute(&qu_insert, &[&facility, &item.channel, &searchaddr, &addr])
|
||||
let mut do_block = false;
|
||||
for a2 in &gw_addrs {
|
||||
if &item.src == a2 {
|
||||
do_block = true;
|
||||
warn!("gateways responded to search");
|
||||
}
|
||||
}
|
||||
if let Some(a1) = item.addr.as_ref() {
|
||||
for a2 in &gw_addrs {
|
||||
if a1 == a2 {
|
||||
do_block = true;
|
||||
warn!("do not use gateways as ioc address");
|
||||
}
|
||||
}
|
||||
}
|
||||
if do_block {
|
||||
info!("blocking {item:?}");
|
||||
} else {
|
||||
info!("using {item:?}");
|
||||
let srcaddr = item.src.to_string();
|
||||
let addr = item.addr.map(|x| x.to_string()).unwrap_or(String::new());
|
||||
let rows = pg_client
|
||||
.query(&qu_select, &[&facility, &item.channel, &srcaddr])
|
||||
.await
|
||||
.unwrap();
|
||||
} else {
|
||||
let addr2: &str = rows[0].get(0);
|
||||
if addr2 != addr {
|
||||
if rows.is_empty() {
|
||||
info!("insert {item:?}");
|
||||
pg_client
|
||||
.execute(&qu_update, &[&facility, &item.channel, &searchaddr, &addr])
|
||||
.execute(&qu_insert, &[&facility, &item.channel, &srcaddr, &addr])
|
||||
.await
|
||||
.unwrap();
|
||||
} else {
|
||||
info!("update {item:?}");
|
||||
let addr2: &str = rows[0].get(0);
|
||||
if addr2 != addr {
|
||||
pg_client
|
||||
.execute(&qu_update, &[&facility, &item.channel, &srcaddr, &addr])
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::ca::conn::ConnCommand;
|
||||
use crate::ca::{CommandQueueSet, IngestCommons};
|
||||
use crate::ca::IngestCommons;
|
||||
use axum::extract::Query;
|
||||
use http::request::Parts;
|
||||
use log::*;
|
||||
@@ -9,7 +9,46 @@ use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
async fn get_empty() -> String {
|
||||
format!("")
|
||||
String::new()
|
||||
}
|
||||
|
||||
async fn send_command<'a, IT, F, R>(it: &mut IT, cmdgen: F) -> Vec<async_channel::Receiver<R>>
|
||||
where
|
||||
IT: Iterator<Item = (&'a SocketAddrV4, &'a async_channel::Sender<ConnCommand>)>,
|
||||
F: Fn() -> (ConnCommand, async_channel::Receiver<R>),
|
||||
{
|
||||
let mut rxs = Vec::new();
|
||||
for (_, tx) in it {
|
||||
let (cmd, rx) = cmdgen();
|
||||
match tx.send(cmd).await {
|
||||
Ok(()) => {
|
||||
rxs.push(rx);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("can not send command {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
rxs
|
||||
}
|
||||
|
||||
async fn find_channel(
|
||||
params: HashMap<String, String>,
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
) -> axum::Json<Vec<(String, Vec<String>)>> {
|
||||
let pattern = params.get("pattern").map_or(String::new(), |x| x.clone()).to_string();
|
||||
let g = ingest_commons.command_queue_set.queues().lock().await;
|
||||
let mut it = g.iter();
|
||||
let rxs = send_command(&mut it, || ConnCommand::find_channel(pattern.clone())).await;
|
||||
let mut res = Vec::new();
|
||||
for rx in rxs {
|
||||
let item = rx.recv().await.unwrap();
|
||||
if item.1.len() > 0 {
|
||||
let item = (item.0.to_string(), item.1);
|
||||
res.push(item);
|
||||
}
|
||||
}
|
||||
axum::Json(res)
|
||||
}
|
||||
|
||||
async fn channel_add(params: HashMap<String, String>, ingest_commons: Arc<IngestCommons>) -> String {
|
||||
@@ -121,11 +160,65 @@ async fn channel_remove(
|
||||
}
|
||||
}
|
||||
|
||||
async fn channel_state(params: HashMap<String, String>, ingest_commons: Arc<IngestCommons>) -> String {
|
||||
let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string();
|
||||
let g = ingest_commons.command_queue_set.queues().lock().await;
|
||||
let mut rxs = Vec::new();
|
||||
for (_, tx) in g.iter() {
|
||||
let (cmd, rx) = ConnCommand::channel_state(name.clone());
|
||||
match tx.send(cmd).await {
|
||||
Ok(()) => {
|
||||
rxs.push(rx);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("can not send command {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut res = Vec::new();
|
||||
for rx in rxs {
|
||||
let item = rx.recv().await.unwrap();
|
||||
if let Some(st) = item.1 {
|
||||
let item = (item.0.to_string(), st);
|
||||
res.push(item);
|
||||
}
|
||||
}
|
||||
serde_json::to_string(&res).unwrap()
|
||||
}
|
||||
|
||||
async fn channel_states(
|
||||
_params: HashMap<String, String>,
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
) -> axum::Json<Vec<crate::ca::conn::ChannelStateInfo>> {
|
||||
let g = ingest_commons.command_queue_set.queues().lock().await;
|
||||
let mut rxs = Vec::new();
|
||||
for (_, tx) in g.iter() {
|
||||
let (cmd, rx) = ConnCommand::channel_states_all();
|
||||
match tx.send(cmd).await {
|
||||
Ok(()) => {
|
||||
rxs.push(rx);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("can not send command {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut res = Vec::new();
|
||||
for rx in rxs {
|
||||
let item = rx.recv().await.unwrap();
|
||||
for h in item.1 {
|
||||
res.push(h);
|
||||
}
|
||||
}
|
||||
res.sort_unstable_by_key(|v| u32::MAX - v.interest_score as u32);
|
||||
//let res: Vec<_> = res.into_iter().rev().take(10).collect();
|
||||
axum::Json(res)
|
||||
}
|
||||
|
||||
pub async fn start_metrics_service(
|
||||
bind_to: String,
|
||||
insert_frac: Arc<AtomicU64>,
|
||||
insert_ivl_min: Arc<AtomicU64>,
|
||||
command_queue_set: Arc<CommandQueueSet>,
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
) {
|
||||
use axum::routing::{get, put};
|
||||
@@ -150,96 +243,36 @@ pub async fn start_metrics_service(
|
||||
.route(
|
||||
"/daqingest/find/channel",
|
||||
get({
|
||||
let command_queue_set = command_queue_set.clone();
|
||||
|Query(params): Query<HashMap<String, String>>| async move {
|
||||
let pattern = params.get("pattern").map_or(String::new(), |x| x.clone()).to_string();
|
||||
let g = command_queue_set.queues().lock().await;
|
||||
let mut rxs = Vec::new();
|
||||
for (_, tx) in g.iter() {
|
||||
let (cmd, rx) = ConnCommand::find_channel(pattern.clone());
|
||||
rxs.push(rx);
|
||||
if let Err(_) = tx.send(cmd).await {
|
||||
error!("can not send command");
|
||||
}
|
||||
}
|
||||
let mut res = Vec::new();
|
||||
for rx in rxs {
|
||||
let item = rx.recv().await.unwrap();
|
||||
if item.1.len() > 0 {
|
||||
let item = (item.0.to_string(), item.1);
|
||||
res.push(item);
|
||||
}
|
||||
}
|
||||
serde_json::to_string(&res).unwrap()
|
||||
}
|
||||
let ingest_commons = ingest_commons.clone();
|
||||
|Query(params): Query<HashMap<String, String>>| find_channel(params, ingest_commons)
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/daqingest/channel/state",
|
||||
get({
|
||||
let command_queue_set = command_queue_set.clone();
|
||||
|Query(params): Query<HashMap<String, String>>| async move {
|
||||
let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string();
|
||||
let g = command_queue_set.queues().lock().await;
|
||||
let mut rxs = Vec::new();
|
||||
for (_, tx) in g.iter() {
|
||||
let (cmd, rx) = ConnCommand::channel_state(name.clone());
|
||||
rxs.push(rx);
|
||||
if let Err(_) = tx.send(cmd).await {
|
||||
error!("can not send command");
|
||||
}
|
||||
}
|
||||
let mut res = Vec::new();
|
||||
for rx in rxs {
|
||||
let item = rx.recv().await.unwrap();
|
||||
if let Some(st) = item.1 {
|
||||
let item = (item.0.to_string(), st);
|
||||
res.push(item);
|
||||
}
|
||||
}
|
||||
serde_json::to_string(&res).unwrap()
|
||||
}
|
||||
let ingest_commons = ingest_commons.clone();
|
||||
|Query(params): Query<HashMap<String, String>>| channel_state(params, ingest_commons)
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/daqingest/channel/states",
|
||||
get({
|
||||
let command_queue_set = command_queue_set.clone();
|
||||
|Query(_params): Query<HashMap<String, String>>| async move {
|
||||
let g = command_queue_set.queues().lock().await;
|
||||
let mut rxs = Vec::new();
|
||||
for (_, tx) in g.iter() {
|
||||
let (cmd, rx) = ConnCommand::channel_states_all();
|
||||
rxs.push(rx);
|
||||
if let Err(_) = tx.send(cmd).await {
|
||||
error!("can not send command");
|
||||
}
|
||||
}
|
||||
let mut res = Vec::new();
|
||||
for rx in rxs {
|
||||
let item = rx.recv().await.unwrap();
|
||||
for h in item.1 {
|
||||
res.push((item.0.clone(), h));
|
||||
}
|
||||
}
|
||||
res.sort_unstable_by_key(|(_, v)| v.interest_score as u32);
|
||||
let res: Vec<_> = res.into_iter().rev().take(10).collect();
|
||||
serde_json::to_string(&res).unwrap()
|
||||
}
|
||||
let ingest_commons = ingest_commons.clone();
|
||||
|Query(params): Query<HashMap<String, String>>| channel_states(params, ingest_commons)
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/daqingest/channel/add",
|
||||
get({
|
||||
let ingest_commons = ingest_commons.clone();
|
||||
|Query(params): Query<HashMap<String, String>>| async move { channel_add(params, ingest_commons).await }
|
||||
|Query(params): Query<HashMap<String, String>>| channel_add(params, ingest_commons)
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/daqingest/channel/remove",
|
||||
get({
|
||||
let ingest_commons = ingest_commons.clone();
|
||||
|Query(params): Query<HashMap<String, String>>| async move { channel_remove(params, ingest_commons).await }
|
||||
|Query(params): Query<HashMap<String, String>>| channel_remove(params, ingest_commons)
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
|
||||
@@ -10,10 +10,11 @@ use scylla::prepared_statement::PreparedStatement;
|
||||
use scylla::transport::errors::QueryError;
|
||||
use scylla::{QueryResult, Session as ScySession};
|
||||
use stats::CaConnStats;
|
||||
use std::net::SocketAddrV4;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Instant;
|
||||
use std::time::{Instant, SystemTime};
|
||||
|
||||
pub struct ScyInsertFut {
|
||||
#[allow(unused)]
|
||||
@@ -83,6 +84,36 @@ impl Future for ScyInsertFut {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ConnectionStatus {
|
||||
ConnectError = 1,
|
||||
ConnectTimeout = 2,
|
||||
Established = 3,
|
||||
Closing = 4,
|
||||
ClosedUnexpected = 5,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectionStatusItem {
|
||||
pub ts: SystemTime,
|
||||
pub addr: SocketAddrV4,
|
||||
pub status: ConnectionStatus,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ChannelStatus {
|
||||
Opened = 1,
|
||||
Closed = 2,
|
||||
ClosedUnexpected = 3,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ChannelStatusItem {
|
||||
pub ts: SystemTime,
|
||||
pub series: u64,
|
||||
pub status: ChannelStatus,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct InsertItem {
|
||||
pub series: u64,
|
||||
@@ -114,6 +145,8 @@ pub struct IvlItem {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum QueryItem {
|
||||
ConnectionStatus(ConnectionStatusItem),
|
||||
ChannelStatus(ChannelStatusItem),
|
||||
Insert(InsertItem),
|
||||
Mute(MuteItem),
|
||||
Ivl(IvlItem),
|
||||
@@ -266,3 +299,29 @@ pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaCon
|
||||
stats.inserts_val_inc();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn insert_connection_status(
|
||||
item: ConnectionStatusItem,
|
||||
data_store: &DataStore,
|
||||
_stats: &CaConnStats,
|
||||
) -> Result<(), Error> {
|
||||
let tsunix = item.ts.duration_since(std::time::UNIX_EPOCH).unwrap();
|
||||
let secs = tsunix.as_secs() * netpod::timeunits::SEC;
|
||||
let nanos = tsunix.subsec_nanos() as u64;
|
||||
let ts = secs + nanos;
|
||||
let div = netpod::timeunits::SEC * 600;
|
||||
let ts_msp = ts / div * div;
|
||||
let ts_lsp = ts - ts_msp;
|
||||
let kind = item.status as u32;
|
||||
let addr = format!("{}", item.addr);
|
||||
let params = (ts_msp as i64, ts_lsp as i64, kind as i32, addr);
|
||||
data_store
|
||||
.scy
|
||||
.query(
|
||||
"insert into connection_status (ts_msp, ts_lsp, kind, addr) values (?, ?, ?, ?)",
|
||||
params,
|
||||
)
|
||||
.await
|
||||
.err_conv()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
14
readme.md
14
readme.md
@@ -26,7 +26,7 @@ api_bind: "0.0.0.0:3011"
|
||||
local_epics_hostname: sf-daqsync-02.psi.ch
|
||||
# The backend name to use for the channels handled by this daqingest instance:
|
||||
backend: scylla
|
||||
# Hosts to use for channel access search:
|
||||
# Addresses to use for channel access search:
|
||||
search:
|
||||
- "172.26.0.255"
|
||||
- "172.26.2.255"
|
||||
@@ -49,3 +49,15 @@ channels:
|
||||
- "SOME-CHANNEL:1"
|
||||
- "OTHER-CHANNEL:2"
|
||||
```
|
||||
|
||||
|
||||
## Access status and configuration of daqingest at runtime
|
||||
|
||||
Status and configuration can be accessed at runtime via http at the address
|
||||
as configured by the `api_bind` parameter.
|
||||
|
||||
### Check the state of a channel
|
||||
|
||||
```txt
|
||||
http://<api_bind>/daqingest/channel/state?name=[...]
|
||||
```
|
||||
|
||||
Reference in New Issue
Block a user