WIP on ingest

This commit is contained in:
Dominik Werder
2022-05-05 15:43:00 +02:00
parent fb8184957c
commit c6b2756d4a
4 changed files with 455 additions and 296 deletions

View File

@@ -21,7 +21,7 @@ pub fn main() -> Result<(), Error> {
}
SubCmd::ChannelAccess(k) => match k {
ChannelAccess::CaChannel(_) => todo!(),
ChannelAccess::CaSearch(k) => netfetch::ca::ca_search(k.into()).await?,
ChannelAccess::CaSearch(k) => netfetch::ca::ca_search_2(k.into()).await?,
ChannelAccess::CaConfig(k) => netfetch::ca::ca_connect(k.into()).await?,
},
}

View File

@@ -2,16 +2,16 @@ pub mod conn;
pub mod proto;
pub mod store;
use conn::{CaConn, FindIoc};
use self::conn::FindIocStream;
use self::store::DataStore;
use crate::zmtp::ErrConv;
use conn::CaConn;
use err::Error;
use futures_util::stream::FuturesUnordered;
use futures_util::{StreamExt, TryFutureExt};
use futures_util::StreamExt;
use log::*;
use scylla::batch::Consistency;
use scylla::prepared_statement::PreparedStatement;
use scylla::Session as ScySession;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, VecDeque};
use std::collections::BTreeMap;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::path::PathBuf;
use std::sync::Arc;
@@ -19,10 +19,6 @@ use std::time::Duration;
use tokio::fs::OpenOptions;
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
use tokio::task::JoinError;
use tokio::time::error::Elapsed;
use self::store::{ChannelRegistry, DataStore};
#[derive(Debug, Serialize, Deserialize)]
struct ChannelConfig {
@@ -85,41 +81,39 @@ pub struct CaConnectOpts {
pub abort_after_search: u32,
}
async fn unwrap_search_result(
item: Result<Result<Result<(String, SocketAddrV4, Option<SocketAddrV4>), Error>, Elapsed>, JoinError>,
scy: &ScySession,
qu: &PreparedStatement,
) -> Result<(String, SocketAddrV4, Option<SocketAddrV4>), Error> {
match item {
Ok(k) => match k {
Ok(k) => match k {
Ok(h) => match h.2 {
Some(k) => {
scy.execute(qu, (&h.0, format!("{:?}", h.1), format!("{:?}", k)))
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
Ok(h)
async fn resolve_address(addr_str: &str) -> Result<SocketAddrV4, Error> {
const PORT_DEFAULT: u16 = 5064;
let ac = match addr_str.parse::<SocketAddrV4>() {
Ok(k) => k,
Err(_) => match addr_str.parse::<Ipv4Addr>() {
Ok(k) => SocketAddrV4::new(k, PORT_DEFAULT),
Err(e) => match tokio::net::lookup_host(&addr_str).await {
Ok(k) => {
let vs: Vec<_> = k
.filter_map(|x| match x {
SocketAddr::V4(k) => Some(k),
SocketAddr::V6(_) => None,
})
.collect();
if let Some(k) = vs.first() {
*k
} else {
error!("Can not understand name for {:?} {:?}", addr_str, vs);
return Err(e.into());
}
None => Ok(h),
},
}
Err(e) => {
error!("bad search {e:?}");
Err(e)
error!("{e:?}");
return Err(e.into());
}
},
Err(e) => {
error!("Elapsed");
Err(Error::with_msg_no_trace(format!("{e:?}")))
}
},
Err(e) => {
error!("JoinError");
Err(Error::with_msg_no_trace(format!("{e:?}")))
}
}
};
Ok(ac)
}
pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
pub async fn ca_search_2(opts: ListenFromFileOpts) -> Result<(), Error> {
let facility = "scylla";
let opts = parse_config(opts.config).await?;
let scy = scylla::SessionBuilder::new()
.known_node("sf-nube-11:19042")
@@ -129,122 +123,67 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu = scy
.prepare("insert into ioc_by_channel (channel, searchaddr, addr) values (?, ?, ?)")
.prepare("insert into ioc_by_channel (facility, channel, searchaddr, addr) values (?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
const PORT_DEFAULT: u16 = 5064;
info!("Look up {} channel hosts", opts.channels.len());
let mut fut_queue = FuturesUnordered::new();
let mut res2 = vec![];
let mut chns = VecDeque::new();
let mut addrs = vec![];
for s in &opts.search {
let x = resolve_address(s).await?;
addrs.push(x);
}
let mut finder = FindIocStream::new(addrs);
for ch in &opts.channels {
for ac in &opts.search {
chns.push_back((ch.clone(), ac.clone()));
}
finder.push(ch.into());
}
let max_simul = opts.max_simul;
let timeout = opts.timeout;
let mut ix1 = 0;
'lo2: loop {
while fut_queue.len() < max_simul && chns.len() > 0 {
let (ch, ac) = chns.pop_front().unwrap();
let ch2 = ch.clone();
let ac = match ac.parse::<SocketAddrV4>() {
Ok(k) => k,
Err(_) => match ac.parse::<Ipv4Addr>() {
Ok(k) => SocketAddrV4::new(k, PORT_DEFAULT),
Err(e) => match tokio::net::lookup_host(&ac).await {
Ok(k) => {
let vs: Vec<_> = k
.filter_map(|x| match x {
SocketAddr::V4(k) => Some(k),
SocketAddr::V6(_) => None,
})
.collect();
if let Some(k) = vs.first() {
*k
} else {
error!("Can not understand name for {:?} {:?}", ac, vs);
return Err(e.into());
}
}
Err(e) => {
error!("{e:?}");
return Err(e.into());
}
},
},
};
ix1 += 1;
if ix1 >= 500 {
info!("Start search for {} {}", ch, ac);
ix1 = 0;
let deadline = tokio::time::Instant::now()
.checked_add(Duration::from_millis(100000000))
.unwrap();
let mut i1 = 0;
loop {
let k = tokio::time::timeout_at(deadline, finder.next()).await;
let item = match k {
Ok(Some(k)) => k,
Ok(None) => {
info!("Search stream exhausted");
break;
}
let fut = FindIoc::new(ch.clone(), Ipv4Addr::UNSPECIFIED, ac.clone(), timeout)
.map_ok(move |x| (ch2, ac.clone(), x));
let fut = tokio::time::timeout(Duration::from_millis(timeout + 1000), fut);
let jh = tokio::spawn(fut);
fut_queue.push(jh);
if chns.is_empty() {
break 'lo2;
}
}
while fut_queue.len() >= max_simul {
match fut_queue.next().await {
Some(item) => {
let item = unwrap_search_result(item, &scy, &qu).await;
res2.push(item);
}
None => break,
}
}
}
while fut_queue.len() > 0 {
match fut_queue.next().await {
Some(item) => {
let item = unwrap_search_result(item, &scy, &qu).await;
res2.push(item);
}
None => break,
}
}
info!("Collected {} results", res2.len());
let mut channels_set = BTreeMap::new();
let mut channels_by_host = BTreeMap::new();
for item in res2 {
// TODO should we continue even if some channel gives an error or can not be located?
match item {
Ok((ch, ac, Some(addr))) => {
info!("Found address {} {:?} {:?}", ch, ac, addr);
channels_set.insert(ch.clone(), true);
let key = addr;
if !channels_by_host.contains_key(&key) {
channels_by_host.insert(key, vec![ch]);
} else {
channels_by_host.get_mut(&key).unwrap().push(ch);
}
}
Ok((_, _, None)) => {}
Err(e) => {
error!("Error in res2 list: {e:?}");
Err(_) => {
warn!("timed out");
break;
}
};
}
for (host, channels) in &channels_by_host {
info!("Have: {:?} {:?}", host, channels.len());
}
let nil = None::<i8>;
for ch in &opts.channels {
if !channels_set.contains_key(ch) {
scy.execute(&qu, (ch, "", nil))
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let item = match item {
Ok(k) => k,
Err(e) => {
error!("ca_search_2 {e:?}");
continue;
}
};
for item in item {
scy.execute(
&qu,
(
facility,
&item.channel,
item.src.to_string(),
item.addr.map(|x| x.to_string()),
),
)
.await
.err_conv()?;
}
tokio::time::sleep(Duration::from_millis(1)).await;
i1 += 1;
if i1 > 500 {
i1 = 0;
info!("{}", finder.quick_state());
}
}
Ok(())
}
pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
let facility = "scylla";
let opts = parse_config(opts.config).await?;
let scy = scylla::SessionBuilder::new()
.known_node("sf-nube-11:19042")
@@ -254,14 +193,15 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let scy = Arc::new(scy);
info!("FIND IOCS");
let qu_find_addr = scy
.prepare("select addr from ioc_by_channel where channel = ?")
.prepare("select addr from ioc_by_channel where facility = ? and channel = ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let mut channels_by_host = BTreeMap::new();
for (ix, ch) in opts.channels.iter().enumerate() {
let res = scy
.execute(&qu_find_addr, (ch,))
.execute(&qu_find_addr, (facility, ch))
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
if res.rows_num().unwrap() == 0 {
@@ -282,6 +222,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
if opts.abort_after_search == 1 {
return Ok(());
}
info!("CONNECT TO HOSTS");
let data_store = Arc::new(DataStore::new(scy.clone()).await?);
let mut conn_jhs = vec![];
for (host, channels) in channels_by_host {

View File

@@ -10,7 +10,7 @@ use libc::c_int;
use log::*;
use netpod::timeunits::SEC;
use netpod::{ScalarType, Shape};
use std::collections::BTreeMap;
use std::collections::{BTreeMap, VecDeque};
use std::net::{Ipv4Addr, SocketAddrV4};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -486,12 +486,6 @@ impl Stream for CaConn {
}
}
enum FindIocState {
Init,
WaitWritable,
WaitReadable,
}
struct SockBox(c_int);
impl Drop for SockBox {
@@ -505,43 +499,103 @@ impl Drop for SockBox {
}
}
const SEARCH_ID: AtomicUsize = AtomicUsize::new(0);
static BATCH_ID: AtomicUsize = AtomicUsize::new(0);
static SEARCH_ID2: AtomicUsize = AtomicUsize::new(0);
pub struct FindIoc {
state: FindIocState,
channel: String,
search_id: u32,
sock: SockBox,
afd: Option<AsyncFd<i32>>,
addr: libc::sockaddr_in,
addr_len: usize,
deadline: Pin<Box<tokio::time::Sleep>>,
result: Option<SocketAddrV4>,
addr_bind: Ipv4Addr,
addr_conn: SocketAddrV4,
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
struct BatchId(u32);
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
struct SearchId(u32);
struct SearchBatch {
id: BatchId,
ts_beg: Instant,
tgts: VecDeque<usize>,
channels: Vec<String>,
sids: Vec<SearchId>,
done: Vec<SearchId>,
}
// Do low-level approach first to make sure it works as specified.
impl FindIoc {
pub fn new(channel: String, addr_bind: Ipv4Addr, addr_conn: SocketAddrV4, timeout: u64) -> Self {
let addr = unsafe { std::mem::transmute_copy(&[0u8; std::mem::size_of::<libc::sockaddr_in>()]) };
let search_id = SEARCH_ID.fetch_add(1, Ordering::AcqRel) as u32;
#[derive(Debug)]
pub struct FindIocRes {
pub src: SocketAddrV4,
pub channel: String,
pub addr: Option<SocketAddrV4>,
}
pub struct FindIocStream {
tgts: Vec<SocketAddrV4>,
channels_input: VecDeque<String>,
in_flight: BTreeMap<BatchId, SearchBatch>,
in_flight_max: usize,
bid_by_sid: BTreeMap<SearchId, BatchId>,
batch_send_queue: VecDeque<BatchId>,
sock: SockBox,
afd: AsyncFd<i32>,
buf1: Vec<u8>,
send_addr: SocketAddrV4,
out_queue: VecDeque<FindIocRes>,
ping: Pin<Box<tokio::time::Sleep>>,
channels_per_batch: usize,
batch_run_max: Duration,
bids_all_done: BTreeMap<BatchId, ()>,
bids_timed_out: BTreeMap<BatchId, ()>,
sids_done: BTreeMap<SearchId, ()>,
result_for_done_sid_count: u64,
}
impl FindIocStream {
pub fn new(tgts: Vec<SocketAddrV4>) -> Self {
info!("FindIocStream tgts {tgts:?}");
let sock = unsafe { Self::create_socket() }.unwrap();
let afd = AsyncFd::new(sock.0).unwrap();
Self {
state: FindIocState::Init,
channel,
search_id,
sock: SockBox(-1),
afd: None,
addr: addr,
addr_len: 0,
deadline: Box::pin(tokio::time::sleep(Duration::from_millis(timeout))),
result: None,
addr_bind,
addr_conn,
tgts,
channels_input: VecDeque::new(),
in_flight: BTreeMap::new(),
bid_by_sid: BTreeMap::new(),
batch_send_queue: VecDeque::new(),
sock,
afd,
buf1: vec![0; 1024],
send_addr: SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 5064),
out_queue: VecDeque::new(),
ping: Box::pin(tokio::time::sleep(Duration::from_millis(200))),
bids_all_done: BTreeMap::new(),
bids_timed_out: BTreeMap::new(),
sids_done: BTreeMap::new(),
result_for_done_sid_count: 0,
in_flight_max: 10,
channels_per_batch: 10,
batch_run_max: Duration::from_millis(1500),
}
}
unsafe fn create_socket(&mut self) -> Result<(), Error> {
pub fn quick_state(&self) -> String {
format!(
"channels_input {} in_flight {} bid_by_sid {} out_queue {} result_for_done_sid_count {} bids_timed_out {}",
self.channels_input.len(),
self.in_flight.len(),
self.bid_by_sid.len(),
self.out_queue.len(),
self.result_for_done_sid_count,
self.bids_timed_out.len()
)
}
pub fn push(&mut self, x: String) {
self.channels_input.push_back(x);
}
fn buf_and_batch(&mut self, bid: &BatchId) -> Option<(&mut Vec<u8>, &mut SearchBatch)> {
match self.in_flight.get_mut(bid) {
Some(batch) => Some((&mut self.buf1, batch)),
None => None,
}
}
unsafe fn create_socket() -> Result<SockBox, Error> {
let ec = libc::socket(libc::AF_INET, libc::SOCK_DGRAM, 0);
if ec == -1 {
return Err("can not create socket".into());
@@ -566,7 +620,7 @@ impl FindIoc {
return Err("can not set nonblock".into());
}
}
let ip: [u8; 4] = self.addr_bind.octets();
let ip: [u8; 4] = [0, 0, 0, 0];
let addr = libc::sockaddr_in {
sin_family: libc::AF_INET as u16,
sin_port: 0,
@@ -576,8 +630,6 @@ impl FindIoc {
sin_zero: [0; 8],
};
let addr_len = std::mem::size_of::<libc::sockaddr_in>();
self.addr = addr.clone();
self.addr_len = addr_len;
let ec = libc::bind(sock.0, &addr as *const _ as _, addr_len as _);
if ec == -1 {
return Err("can not bind socket".into());
@@ -602,40 +654,22 @@ impl FindIoc {
}
}
}
self.sock = sock;
Ok(())
Ok(sock)
}
unsafe fn try_write(&mut self) -> Result<(), Error> {
let sock = self.sock.0;
let ip = self.addr_conn.ip().octets();
unsafe fn try_send(sock: i32, addr: &SocketAddrV4, buf: &[u8]) -> Poll<Result<(), Error>> {
let ip = addr.ip().octets();
let port = addr.port();
let addr = libc::sockaddr_in {
sin_family: libc::AF_INET as u16,
sin_port: (self.addr_conn.port() as u16).to_be(),
sin_port: port.to_be(),
sin_addr: libc::in_addr {
s_addr: u32::from_ne_bytes(ip),
},
sin_zero: [0; 8],
};
let addr_len = std::mem::size_of::<libc::sockaddr_in>();
let chb = self.channel.as_bytes();
let npadded = (chb.len() + 1 + 7) / 8 * 8;
let npad = npadded - self.channel.len();
let mut buf = vec![
//
0u8, 0, 0, 0, //
0, 0, 0, 13, //
0, 0, 0, 0, //
0, 0, 0, 0, //
];
buf.extend_from_slice(&[0, 6]);
buf.extend_from_slice(&(npadded as u16).to_be_bytes());
buf.extend_from_slice(&[0, 0, 0, 13]);
buf.extend_from_slice(&[0, 0, 0, 0]);
buf.extend_from_slice(&self.search_id.to_be_bytes());
buf.extend_from_slice(chb);
buf.extend_from_slice(&vec![0u8; npad]);
//info!("sendto {ip:?} n {}", buf.len());
//info!("sendto {ip:?} {} n {}", port, buf.len());
let ec = libc::sendto(
sock,
&buf[0] as *const _ as _,
@@ -647,17 +681,15 @@ impl FindIoc {
if ec == -1 {
let errno = *libc::__errno_location();
if errno == libc::EAGAIN {
error!("NOT YET READY FOR SENDING...");
return Err("socket not ready for write".into());
return Poll::Pending;
} else {
return Err("can not send".into());
return Poll::Ready(Err("FindIocStream can not send".into()));
}
}
Ok(())
Poll::Ready(Ok(()))
}
unsafe fn try_read(&mut self) -> Result<(), Error> {
let sock = self.sock.0;
unsafe fn try_read(sock: i32) -> Poll<Result<(SocketAddrV4, Vec<(SearchId, SocketAddrV4)>), Error>> {
let mut saddr_mem = [0u8; std::mem::size_of::<libc::sockaddr>()];
let mut saddr_len: libc::socklen_t = saddr_mem.len() as _;
let mut buf = vec![0u8; 1024];
@@ -672,15 +704,15 @@ impl FindIoc {
if ec == -1 {
let errno = *libc::__errno_location();
if errno == libc::EAGAIN {
error!("try_read BUT NOT YET READY FOR READING...");
return Err("socket not ready for read".into());
return Poll::Pending;
} else {
return Err("can not read".into());
return Poll::Ready(Err("FindIocStream can not read".into()));
}
} else if ec < 0 {
error!("unexpected received {ec}");
Poll::Ready(Err(Error::with_msg_no_trace(format!("try_read ec {ec}"))))
} else if ec == 0 {
error!("received zero bytes");
Poll::Ready(Err(Error::with_msg_no_trace(format!("try_read ec {ec}"))))
} else {
let saddr2: libc::sockaddr_in = std::mem::transmute_copy(&saddr_mem);
let src_addr = Ipv4Addr::from(saddr2.sin_addr.s_addr.to_ne_bytes());
@@ -725,101 +757,284 @@ impl FindIoc {
msgs.push(msg);
}
//info!("received {} msgs {:?}", msgs.len(), msgs);
for (msg_ix, msg) in msgs.iter().enumerate() {
let mut res = vec![];
for msg in msgs.iter() {
match &msg.ty {
CaMsgTy::SearchRes(k) => {
if k.id != self.search_id {
warn!("id mismatch {} vs {}", k.id, self.search_id);
}
if false {
let addr = Ipv4Addr::from(k.addr.to_be_bytes());
info!("Converted address: {addr:?}");
}
info!(
"SearchRes: {}/{} {:?} {:?} {}",
msg_ix,
msgs.len(),
self.channel,
src_addr,
k.tcp_port
);
if self.result.is_none() {
self.result = Some(SocketAddrV4::new(src_addr, k.tcp_port));
} else {
warn!("Result already populated for {}", self.channel);
}
let addr = SocketAddrV4::new(src_addr, k.tcp_port);
res.push((SearchId(k.id), addr));
}
_ => {}
}
}
Poll::Ready(Ok((SocketAddrV4::new(src_addr, src_port), res)))
}
}
fn serialize_batch(buf: &mut Vec<u8>, batch: &SearchBatch) {
buf.extend_from_slice(&[0, 0, 0, 0]);
buf.extend_from_slice(&[0, 0, 0, 13]);
buf.extend_from_slice(&[0, 0, 0, 0]);
buf.extend_from_slice(&[0, 0, 0, 0]);
for (sid, ch) in batch.sids.iter().zip(batch.channels.iter()) {
let chb = ch.as_bytes();
let npadded = (chb.len() + 1 + 7) / 8 * 8;
let npad = npadded - chb.len();
buf.extend_from_slice(&[0, 6]);
buf.extend_from_slice(&(npadded as u16).to_be_bytes());
buf.extend_from_slice(&[0, 0, 0, 13]);
buf.extend_from_slice(&[0, 0, 0, 0]);
buf.extend_from_slice(&sid.0.to_be_bytes());
buf.extend_from_slice(chb);
buf.extend_from_slice(&vec![0u8; npad]);
}
}
fn create_in_flight(&mut self) {
let bid = BATCH_ID.fetch_add(1, Ordering::AcqRel);
let bid = BatchId(bid as u32);
//info!("create_in_flight {bid:?}");
let mut sids = vec![];
let mut chs = vec![];
while chs.len() < self.channels_per_batch && self.channels_input.len() > 0 {
let sid = SEARCH_ID2.fetch_add(1, Ordering::AcqRel);
let sid = SearchId(sid as u32);
self.bid_by_sid.insert(sid.clone(), bid.clone());
sids.push(sid);
chs.push(self.channels_input.pop_front().unwrap());
}
let batch = SearchBatch {
id: bid.clone(),
ts_beg: Instant::now(),
channels: chs,
tgts: self.tgts.iter().enumerate().map(|x| x.0).collect(),
sids,
done: vec![],
};
self.in_flight.insert(bid.clone(), batch);
self.batch_send_queue.push_back(bid);
}
fn handle_result(&mut self, src: SocketAddrV4, res: Vec<(SearchId, SocketAddrV4)>) {
let mut sids_remove = vec![];
for (sid, addr) in res {
self.sids_done.insert(sid.clone(), ());
match self.bid_by_sid.get(&sid) {
Some(bid) => {
sids_remove.push(sid.clone());
match self.in_flight.get_mut(bid) {
Some(batch) => {
for (i2, s2) in batch.sids.iter().enumerate() {
if s2 == &sid {
match batch.channels.get(i2) {
Some(ch) => {
let res = FindIocRes {
channel: ch.into(),
addr: Some(addr),
src: src.clone(),
};
self.out_queue.push_back(res);
}
None => {
error!("no matching channel for {sid:?}");
}
}
}
}
// Book keeping:
batch.done.push(sid);
let mut all_done = true;
if batch.done.len() >= batch.sids.len() {
for s1 in &batch.sids {
if !batch.done.contains(s1) {
all_done = false;
break;
}
}
} else {
all_done = false;
}
if all_done {
//info!("all searches done for {bid:?}");
self.bids_all_done.insert(bid.clone(), ());
self.in_flight.remove(bid);
}
}
None => {
// TODO analyze reasons
error!("no batch for {bid:?}");
}
}
}
None => {
// TODO analyze reasons
if self.sids_done.contains_key(&sid) {
self.result_for_done_sid_count += 1;
} else {
error!("no bid for {sid:?}");
}
}
}
}
for sid in sids_remove {
self.bid_by_sid.remove(&sid);
}
}
fn clear_timed_out(&mut self) {
let now = Instant::now();
let mut bids = vec![];
let mut sids = vec![];
let mut chns = vec![];
for (bid, batch) in &mut self.in_flight {
if now.duration_since(batch.ts_beg) > self.batch_run_max {
for (i2, sid) in batch.sids.iter().enumerate() {
if batch.done.contains(sid) == false {
warn!("Timeout: {bid:?} {}", batch.channels[i2]);
}
sids.push(sid.clone());
chns.push(batch.channels[i2].clone());
}
bids.push(bid.clone());
}
}
for (sid, ch) in sids.into_iter().zip(chns) {
let res = FindIocRes {
src: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0),
channel: ch,
addr: None,
};
self.out_queue.push_back(res);
self.bid_by_sid.remove(&sid);
}
for bid in bids {
self.in_flight.remove(&bid);
}
Ok(())
}
}
impl Future for FindIoc {
// TODO use a dedicated type to indicate timeout.
type Output = Result<Option<SocketAddrV4>, Error>;
impl Stream for FindIocStream {
type Item = Result<VecDeque<FindIocRes>, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
match self.deadline.poll_unpin(cx) {
Ready(()) => {
break Ready(Ok(self.result.clone()));
}
Pending => {}
match self.ping.poll_unpin(cx) {
Ready(_) => {
self.ping = Box::pin(tokio::time::sleep(Duration::from_millis(200)));
cx.waker().wake_by_ref();
}
break match &mut self.state {
FindIocState::Init => match unsafe { Self::create_socket(&mut self) } {
Ok(()) => {
let afd = tokio::io::unix::AsyncFd::new(self.sock.0).expect("can not create AsyncFd");
self.afd = Some(afd);
self.state = FindIocState::WaitWritable;
Pending => {}
}
self.clear_timed_out();
loop {
let mut loop_again = false;
if self.out_queue.is_empty() == false {
let ret = std::mem::replace(&mut self.out_queue, VecDeque::new());
break Ready(Some(Ok(ret)));
}
if !self.buf1.is_empty() {
match self.afd.poll_write_ready(cx) {
Ready(Ok(mut g)) => match unsafe { Self::try_send(self.sock.0, &self.send_addr, &self.buf1) } {
Ready(Ok(())) => {
self.buf1.clear();
loop_again = true;
}
Ready(Err(e)) => {
error!("{e:?}");
}
Pending => {
g.clear_ready();
warn!("socket seemed ready for write, but is not");
loop_again = true;
}
},
Ready(Err(e)) => {
let e = Error::with_msg_no_trace(format!("{e:?}"));
error!("poll_write_ready {e:?}");
}
Pending => {}
}
}
while self.buf1.is_empty() {
match self.batch_send_queue.pop_front() {
Some(bid) => {
match self.buf_and_batch(&bid) {
Some((buf1, batch)) => {
match batch.tgts.pop_front() {
Some(tgtix) => {
Self::serialize_batch(buf1, batch);
match self.tgts.get(tgtix) {
Some(tgt) => {
let tgt = tgt.clone();
//info!("Serialize and queue {bid:?}");
self.send_addr = tgt.clone();
self.batch_send_queue.push_back(bid);
loop_again = true;
}
None => {
self.buf1.clear();
self.batch_send_queue.push_back(bid);
loop_again = true;
error!("tgtix does not exist");
}
}
}
None => {
//info!("Batch exhausted");
loop_again = true;
}
}
}
None => {
if self.bids_all_done.contains_key(&bid) {
// TODO count events
} else {
info!("Batch {bid:?} seems already done");
}
loop_again = true;
}
}
}
None => break,
}
}
while !self.channels_input.is_empty() && self.in_flight.len() < self.in_flight_max {
self.create_in_flight();
loop_again = true;
}
break match self.afd.poll_read_ready(cx) {
Ready(Ok(mut g)) => match unsafe { Self::try_read(self.sock.0) } {
Ready(Ok((src, res))) => {
self.handle_result(src, res);
continue;
}
Err(e) => {
error!("can not create socket {e:?}");
Ready(Err(e))
}
},
FindIocState::WaitWritable => match self.afd.as_mut().unwrap().poll_write_ready(cx) {
Ready(Ok(ref mut g)) => {
g.clear_ready();
match unsafe { Self::try_write(&mut self) } {
Ok(()) => {
self.state = FindIocState::WaitReadable;
continue;
}
Err(e) => Ready(Err(e)),
}
}
Ready(Err(e)) => Ready(Err(e.into())),
Pending => Pending,
},
FindIocState::WaitReadable => match self.afd.as_mut().unwrap().poll_read_ready(cx) {
Ready(Ok(ref mut g)) => {
g.clear_ready();
match unsafe { Self::try_read(&mut self) } {
Ok(()) => {
continue;
}
Err(e) => Ready(Err(e)),
}
}
Ready(Err(e)) => {
error!("WaitReadable Err");
Ready(Err(e.into()))
error!("Error from try_read {e:?}");
Ready(Some(Err(e)))
}
Pending => {
g.clear_ready();
//warn!("socket seemed ready for read, but is not");
continue;
}
Pending => Pending,
},
Ready(Err(e)) => {
let e = Error::with_msg_no_trace(format!("{e:?}"));
error!("poll_read_ready {e:?}");
Ready(Some(Err(e)))
}
Pending => {
if loop_again {
continue;
} else {
if self.channels_input.is_empty() && self.in_flight.is_empty() && self.out_queue.is_empty() {
Ready(None)
} else {
Pending
}
}
}
};
}
}
}
impl std::fmt::Debug for FindIoc {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
fmt.debug_struct("FindIoc").finish()
}
}

View File

@@ -53,10 +53,12 @@ pub async fn get_series_id(scy: &ScySession, cd: &ChannelDescDecoded) -> Result<
let mut h = md5::Md5::new();
h.update(facility.as_bytes());
h.update(channel_name.as_bytes());
h.update(format!("{:?}", scalar_type).as_bytes());
h.update(format!("{:?}", shape).as_bytes());
h.update(format!("{:?} {:?}", scalar_type, shape).as_bytes());
let f = h.finalize();
let mut series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap());
// TODO technically we could/should assert that we run on 2-complement machine.
const SMASK: u64 = 0x7fffffffffffffff;
series = series & SMASK;
for _ in 0..2000 {
let res = scy
.query(
@@ -77,6 +79,7 @@ pub async fn get_series_id(scy: &ScySession, cd: &ChannelDescDecoded) -> Result<
}
tokio::time::sleep(Duration::from_millis(20)).await;
series += 1;
series = series & SMASK;
}
Err(Error::with_msg_no_trace(format!("can not create and insert series id")))
} else {