diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 6056d90..9b746aa 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -21,7 +21,7 @@ pub fn main() -> Result<(), Error> { } SubCmd::ChannelAccess(k) => match k { ChannelAccess::CaChannel(_) => todo!(), - ChannelAccess::CaSearch(k) => netfetch::ca::ca_search(k.into()).await?, + ChannelAccess::CaSearch(k) => netfetch::ca::ca_search_2(k.into()).await?, ChannelAccess::CaConfig(k) => netfetch::ca::ca_connect(k.into()).await?, }, } diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 4ed0cc3..88aba6a 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -2,16 +2,16 @@ pub mod conn; pub mod proto; pub mod store; -use conn::{CaConn, FindIoc}; +use self::conn::FindIocStream; +use self::store::DataStore; +use crate::zmtp::ErrConv; +use conn::CaConn; use err::Error; -use futures_util::stream::FuturesUnordered; -use futures_util::{StreamExt, TryFutureExt}; +use futures_util::StreamExt; use log::*; use scylla::batch::Consistency; -use scylla::prepared_statement::PreparedStatement; -use scylla::Session as ScySession; use serde::{Deserialize, Serialize}; -use std::collections::{BTreeMap, VecDeque}; +use std::collections::BTreeMap; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::path::PathBuf; use std::sync::Arc; @@ -19,10 +19,6 @@ use std::time::Duration; use tokio::fs::OpenOptions; use tokio::io::AsyncReadExt; use tokio::net::TcpStream; -use tokio::task::JoinError; -use tokio::time::error::Elapsed; - -use self::store::{ChannelRegistry, DataStore}; #[derive(Debug, Serialize, Deserialize)] struct ChannelConfig { @@ -85,41 +81,39 @@ pub struct CaConnectOpts { pub abort_after_search: u32, } -async fn unwrap_search_result( - item: Result), Error>, Elapsed>, JoinError>, - scy: &ScySession, - qu: &PreparedStatement, -) -> Result<(String, SocketAddrV4, Option), Error> { - match item { - Ok(k) => match k { - Ok(k) => match k { - Ok(h) => match h.2 { - Some(k) => { - scy.execute(qu, (&h.0, format!("{:?}", h.1), format!("{:?}", k))) - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - Ok(h) +async fn resolve_address(addr_str: &str) -> Result { + const PORT_DEFAULT: u16 = 5064; + let ac = match addr_str.parse::() { + Ok(k) => k, + Err(_) => match addr_str.parse::() { + Ok(k) => SocketAddrV4::new(k, PORT_DEFAULT), + Err(e) => match tokio::net::lookup_host(&addr_str).await { + Ok(k) => { + let vs: Vec<_> = k + .filter_map(|x| match x { + SocketAddr::V4(k) => Some(k), + SocketAddr::V6(_) => None, + }) + .collect(); + if let Some(k) = vs.first() { + *k + } else { + error!("Can not understand name for {:?} {:?}", addr_str, vs); + return Err(e.into()); } - None => Ok(h), - }, + } Err(e) => { - error!("bad search {e:?}"); - Err(e) + error!("{e:?}"); + return Err(e.into()); } }, - Err(e) => { - error!("Elapsed"); - Err(Error::with_msg_no_trace(format!("{e:?}"))) - } }, - Err(e) => { - error!("JoinError"); - Err(Error::with_msg_no_trace(format!("{e:?}"))) - } - } + }; + Ok(ac) } -pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { +pub async fn ca_search_2(opts: ListenFromFileOpts) -> Result<(), Error> { + let facility = "scylla"; let opts = parse_config(opts.config).await?; let scy = scylla::SessionBuilder::new() .known_node("sf-nube-11:19042") @@ -129,122 +123,67 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu = scy - .prepare("insert into ioc_by_channel (channel, searchaddr, addr) values (?, ?, ?)") + .prepare("insert into ioc_by_channel (facility, channel, searchaddr, addr) values (?, ?, ?, ?)") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - const PORT_DEFAULT: u16 = 5064; - info!("Look up {} channel hosts", opts.channels.len()); - let mut fut_queue = FuturesUnordered::new(); - let mut res2 = vec![]; - let mut chns = VecDeque::new(); + let mut addrs = vec![]; + for s in &opts.search { + let x = resolve_address(s).await?; + addrs.push(x); + } + let mut finder = FindIocStream::new(addrs); for ch in &opts.channels { - for ac in &opts.search { - chns.push_back((ch.clone(), ac.clone())); - } + finder.push(ch.into()); } - let max_simul = opts.max_simul; - let timeout = opts.timeout; - let mut ix1 = 0; - 'lo2: loop { - while fut_queue.len() < max_simul && chns.len() > 0 { - let (ch, ac) = chns.pop_front().unwrap(); - let ch2 = ch.clone(); - let ac = match ac.parse::() { - Ok(k) => k, - Err(_) => match ac.parse::() { - Ok(k) => SocketAddrV4::new(k, PORT_DEFAULT), - Err(e) => match tokio::net::lookup_host(&ac).await { - Ok(k) => { - let vs: Vec<_> = k - .filter_map(|x| match x { - SocketAddr::V4(k) => Some(k), - SocketAddr::V6(_) => None, - }) - .collect(); - if let Some(k) = vs.first() { - *k - } else { - error!("Can not understand name for {:?} {:?}", ac, vs); - return Err(e.into()); - } - } - Err(e) => { - error!("{e:?}"); - return Err(e.into()); - } - }, - }, - }; - ix1 += 1; - if ix1 >= 500 { - info!("Start search for {} {}", ch, ac); - ix1 = 0; + let deadline = tokio::time::Instant::now() + .checked_add(Duration::from_millis(100000000)) + .unwrap(); + let mut i1 = 0; + loop { + let k = tokio::time::timeout_at(deadline, finder.next()).await; + let item = match k { + Ok(Some(k)) => k, + Ok(None) => { + info!("Search stream exhausted"); + break; } - let fut = FindIoc::new(ch.clone(), Ipv4Addr::UNSPECIFIED, ac.clone(), timeout) - .map_ok(move |x| (ch2, ac.clone(), x)); - let fut = tokio::time::timeout(Duration::from_millis(timeout + 1000), fut); - let jh = tokio::spawn(fut); - fut_queue.push(jh); - if chns.is_empty() { - break 'lo2; - } - } - while fut_queue.len() >= max_simul { - match fut_queue.next().await { - Some(item) => { - let item = unwrap_search_result(item, &scy, &qu).await; - res2.push(item); - } - None => break, - } - } - } - while fut_queue.len() > 0 { - match fut_queue.next().await { - Some(item) => { - let item = unwrap_search_result(item, &scy, &qu).await; - res2.push(item); - } - None => break, - } - } - info!("Collected {} results", res2.len()); - let mut channels_set = BTreeMap::new(); - let mut channels_by_host = BTreeMap::new(); - for item in res2 { - // TODO should we continue even if some channel gives an error or can not be located? - match item { - Ok((ch, ac, Some(addr))) => { - info!("Found address {} {:?} {:?}", ch, ac, addr); - channels_set.insert(ch.clone(), true); - let key = addr; - if !channels_by_host.contains_key(&key) { - channels_by_host.insert(key, vec![ch]); - } else { - channels_by_host.get_mut(&key).unwrap().push(ch); - } - } - Ok((_, _, None)) => {} - Err(e) => { - error!("Error in res2 list: {e:?}"); + Err(_) => { + warn!("timed out"); + break; } }; - } - for (host, channels) in &channels_by_host { - info!("Have: {:?} {:?}", host, channels.len()); - } - let nil = None::; - for ch in &opts.channels { - if !channels_set.contains_key(ch) { - scy.execute(&qu, (ch, "", nil)) - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let item = match item { + Ok(k) => k, + Err(e) => { + error!("ca_search_2 {e:?}"); + continue; + } + }; + for item in item { + scy.execute( + &qu, + ( + facility, + &item.channel, + item.src.to_string(), + item.addr.map(|x| x.to_string()), + ), + ) + .await + .err_conv()?; + } + tokio::time::sleep(Duration::from_millis(1)).await; + i1 += 1; + if i1 > 500 { + i1 = 0; + info!("{}", finder.quick_state()); } } Ok(()) } pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { + let facility = "scylla"; let opts = parse_config(opts.config).await?; let scy = scylla::SessionBuilder::new() .known_node("sf-nube-11:19042") @@ -254,14 +193,15 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let scy = Arc::new(scy); + info!("FIND IOCS"); let qu_find_addr = scy - .prepare("select addr from ioc_by_channel where channel = ?") + .prepare("select addr from ioc_by_channel where facility = ? and channel = ?") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let mut channels_by_host = BTreeMap::new(); for (ix, ch) in opts.channels.iter().enumerate() { let res = scy - .execute(&qu_find_addr, (ch,)) + .execute(&qu_find_addr, (facility, ch)) .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; if res.rows_num().unwrap() == 0 { @@ -282,6 +222,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { if opts.abort_after_search == 1 { return Ok(()); } + info!("CONNECT TO HOSTS"); let data_store = Arc::new(DataStore::new(scy.clone()).await?); let mut conn_jhs = vec![]; for (host, channels) in channels_by_host { diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 708384a..0524c2e 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -10,7 +10,7 @@ use libc::c_int; use log::*; use netpod::timeunits::SEC; use netpod::{ScalarType, Shape}; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, VecDeque}; use std::net::{Ipv4Addr, SocketAddrV4}; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -486,12 +486,6 @@ impl Stream for CaConn { } } -enum FindIocState { - Init, - WaitWritable, - WaitReadable, -} - struct SockBox(c_int); impl Drop for SockBox { @@ -505,43 +499,103 @@ impl Drop for SockBox { } } -const SEARCH_ID: AtomicUsize = AtomicUsize::new(0); +static BATCH_ID: AtomicUsize = AtomicUsize::new(0); +static SEARCH_ID2: AtomicUsize = AtomicUsize::new(0); -pub struct FindIoc { - state: FindIocState, - channel: String, - search_id: u32, - sock: SockBox, - afd: Option>, - addr: libc::sockaddr_in, - addr_len: usize, - deadline: Pin>, - result: Option, - addr_bind: Ipv4Addr, - addr_conn: SocketAddrV4, +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] +struct BatchId(u32); + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] +struct SearchId(u32); + +struct SearchBatch { + id: BatchId, + ts_beg: Instant, + tgts: VecDeque, + channels: Vec, + sids: Vec, + done: Vec, } -// Do low-level approach first to make sure it works as specified. -impl FindIoc { - pub fn new(channel: String, addr_bind: Ipv4Addr, addr_conn: SocketAddrV4, timeout: u64) -> Self { - let addr = unsafe { std::mem::transmute_copy(&[0u8; std::mem::size_of::()]) }; - let search_id = SEARCH_ID.fetch_add(1, Ordering::AcqRel) as u32; +#[derive(Debug)] +pub struct FindIocRes { + pub src: SocketAddrV4, + pub channel: String, + pub addr: Option, +} + +pub struct FindIocStream { + tgts: Vec, + channels_input: VecDeque, + in_flight: BTreeMap, + in_flight_max: usize, + bid_by_sid: BTreeMap, + batch_send_queue: VecDeque, + sock: SockBox, + afd: AsyncFd, + buf1: Vec, + send_addr: SocketAddrV4, + out_queue: VecDeque, + ping: Pin>, + channels_per_batch: usize, + batch_run_max: Duration, + bids_all_done: BTreeMap, + bids_timed_out: BTreeMap, + sids_done: BTreeMap, + result_for_done_sid_count: u64, +} + +impl FindIocStream { + pub fn new(tgts: Vec) -> Self { + info!("FindIocStream tgts {tgts:?}"); + let sock = unsafe { Self::create_socket() }.unwrap(); + let afd = AsyncFd::new(sock.0).unwrap(); Self { - state: FindIocState::Init, - channel, - search_id, - sock: SockBox(-1), - afd: None, - addr: addr, - addr_len: 0, - deadline: Box::pin(tokio::time::sleep(Duration::from_millis(timeout))), - result: None, - addr_bind, - addr_conn, + tgts, + channels_input: VecDeque::new(), + in_flight: BTreeMap::new(), + bid_by_sid: BTreeMap::new(), + batch_send_queue: VecDeque::new(), + sock, + afd, + buf1: vec![0; 1024], + send_addr: SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 5064), + out_queue: VecDeque::new(), + ping: Box::pin(tokio::time::sleep(Duration::from_millis(200))), + bids_all_done: BTreeMap::new(), + bids_timed_out: BTreeMap::new(), + sids_done: BTreeMap::new(), + result_for_done_sid_count: 0, + in_flight_max: 10, + channels_per_batch: 10, + batch_run_max: Duration::from_millis(1500), } } - unsafe fn create_socket(&mut self) -> Result<(), Error> { + pub fn quick_state(&self) -> String { + format!( + "channels_input {} in_flight {} bid_by_sid {} out_queue {} result_for_done_sid_count {} bids_timed_out {}", + self.channels_input.len(), + self.in_flight.len(), + self.bid_by_sid.len(), + self.out_queue.len(), + self.result_for_done_sid_count, + self.bids_timed_out.len() + ) + } + + pub fn push(&mut self, x: String) { + self.channels_input.push_back(x); + } + + fn buf_and_batch(&mut self, bid: &BatchId) -> Option<(&mut Vec, &mut SearchBatch)> { + match self.in_flight.get_mut(bid) { + Some(batch) => Some((&mut self.buf1, batch)), + None => None, + } + } + + unsafe fn create_socket() -> Result { let ec = libc::socket(libc::AF_INET, libc::SOCK_DGRAM, 0); if ec == -1 { return Err("can not create socket".into()); @@ -566,7 +620,7 @@ impl FindIoc { return Err("can not set nonblock".into()); } } - let ip: [u8; 4] = self.addr_bind.octets(); + let ip: [u8; 4] = [0, 0, 0, 0]; let addr = libc::sockaddr_in { sin_family: libc::AF_INET as u16, sin_port: 0, @@ -576,8 +630,6 @@ impl FindIoc { sin_zero: [0; 8], }; let addr_len = std::mem::size_of::(); - self.addr = addr.clone(); - self.addr_len = addr_len; let ec = libc::bind(sock.0, &addr as *const _ as _, addr_len as _); if ec == -1 { return Err("can not bind socket".into()); @@ -602,40 +654,22 @@ impl FindIoc { } } } - self.sock = sock; - Ok(()) + Ok(sock) } - unsafe fn try_write(&mut self) -> Result<(), Error> { - let sock = self.sock.0; - let ip = self.addr_conn.ip().octets(); + unsafe fn try_send(sock: i32, addr: &SocketAddrV4, buf: &[u8]) -> Poll> { + let ip = addr.ip().octets(); + let port = addr.port(); let addr = libc::sockaddr_in { sin_family: libc::AF_INET as u16, - sin_port: (self.addr_conn.port() as u16).to_be(), + sin_port: port.to_be(), sin_addr: libc::in_addr { s_addr: u32::from_ne_bytes(ip), }, sin_zero: [0; 8], }; let addr_len = std::mem::size_of::(); - let chb = self.channel.as_bytes(); - let npadded = (chb.len() + 1 + 7) / 8 * 8; - let npad = npadded - self.channel.len(); - let mut buf = vec![ - // - 0u8, 0, 0, 0, // - 0, 0, 0, 13, // - 0, 0, 0, 0, // - 0, 0, 0, 0, // - ]; - buf.extend_from_slice(&[0, 6]); - buf.extend_from_slice(&(npadded as u16).to_be_bytes()); - buf.extend_from_slice(&[0, 0, 0, 13]); - buf.extend_from_slice(&[0, 0, 0, 0]); - buf.extend_from_slice(&self.search_id.to_be_bytes()); - buf.extend_from_slice(chb); - buf.extend_from_slice(&vec![0u8; npad]); - //info!("sendto {ip:?} n {}", buf.len()); + //info!("sendto {ip:?} {} n {}", port, buf.len()); let ec = libc::sendto( sock, &buf[0] as *const _ as _, @@ -647,17 +681,15 @@ impl FindIoc { if ec == -1 { let errno = *libc::__errno_location(); if errno == libc::EAGAIN { - error!("NOT YET READY FOR SENDING..."); - return Err("socket not ready for write".into()); + return Poll::Pending; } else { - return Err("can not send".into()); + return Poll::Ready(Err("FindIocStream can not send".into())); } } - Ok(()) + Poll::Ready(Ok(())) } - unsafe fn try_read(&mut self) -> Result<(), Error> { - let sock = self.sock.0; + unsafe fn try_read(sock: i32) -> Poll), Error>> { let mut saddr_mem = [0u8; std::mem::size_of::()]; let mut saddr_len: libc::socklen_t = saddr_mem.len() as _; let mut buf = vec![0u8; 1024]; @@ -672,15 +704,15 @@ impl FindIoc { if ec == -1 { let errno = *libc::__errno_location(); if errno == libc::EAGAIN { - error!("try_read BUT NOT YET READY FOR READING..."); - return Err("socket not ready for read".into()); + return Poll::Pending; } else { - return Err("can not read".into()); + return Poll::Ready(Err("FindIocStream can not read".into())); } } else if ec < 0 { error!("unexpected received {ec}"); + Poll::Ready(Err(Error::with_msg_no_trace(format!("try_read ec {ec}")))) } else if ec == 0 { - error!("received zero bytes"); + Poll::Ready(Err(Error::with_msg_no_trace(format!("try_read ec {ec}")))) } else { let saddr2: libc::sockaddr_in = std::mem::transmute_copy(&saddr_mem); let src_addr = Ipv4Addr::from(saddr2.sin_addr.s_addr.to_ne_bytes()); @@ -725,101 +757,284 @@ impl FindIoc { msgs.push(msg); } //info!("received {} msgs {:?}", msgs.len(), msgs); - for (msg_ix, msg) in msgs.iter().enumerate() { + let mut res = vec![]; + for msg in msgs.iter() { match &msg.ty { CaMsgTy::SearchRes(k) => { - if k.id != self.search_id { - warn!("id mismatch {} vs {}", k.id, self.search_id); - } - if false { - let addr = Ipv4Addr::from(k.addr.to_be_bytes()); - info!("Converted address: {addr:?}"); - } - info!( - "SearchRes: {}/{} {:?} {:?} {}", - msg_ix, - msgs.len(), - self.channel, - src_addr, - k.tcp_port - ); - if self.result.is_none() { - self.result = Some(SocketAddrV4::new(src_addr, k.tcp_port)); - } else { - warn!("Result already populated for {}", self.channel); - } + let addr = SocketAddrV4::new(src_addr, k.tcp_port); + res.push((SearchId(k.id), addr)); } _ => {} } } + Poll::Ready(Ok((SocketAddrV4::new(src_addr, src_port), res))) + } + } + + fn serialize_batch(buf: &mut Vec, batch: &SearchBatch) { + buf.extend_from_slice(&[0, 0, 0, 0]); + buf.extend_from_slice(&[0, 0, 0, 13]); + buf.extend_from_slice(&[0, 0, 0, 0]); + buf.extend_from_slice(&[0, 0, 0, 0]); + for (sid, ch) in batch.sids.iter().zip(batch.channels.iter()) { + let chb = ch.as_bytes(); + let npadded = (chb.len() + 1 + 7) / 8 * 8; + let npad = npadded - chb.len(); + buf.extend_from_slice(&[0, 6]); + buf.extend_from_slice(&(npadded as u16).to_be_bytes()); + buf.extend_from_slice(&[0, 0, 0, 13]); + buf.extend_from_slice(&[0, 0, 0, 0]); + buf.extend_from_slice(&sid.0.to_be_bytes()); + buf.extend_from_slice(chb); + buf.extend_from_slice(&vec![0u8; npad]); + } + } + + fn create_in_flight(&mut self) { + let bid = BATCH_ID.fetch_add(1, Ordering::AcqRel); + let bid = BatchId(bid as u32); + //info!("create_in_flight {bid:?}"); + let mut sids = vec![]; + let mut chs = vec![]; + while chs.len() < self.channels_per_batch && self.channels_input.len() > 0 { + let sid = SEARCH_ID2.fetch_add(1, Ordering::AcqRel); + let sid = SearchId(sid as u32); + self.bid_by_sid.insert(sid.clone(), bid.clone()); + sids.push(sid); + chs.push(self.channels_input.pop_front().unwrap()); + } + let batch = SearchBatch { + id: bid.clone(), + ts_beg: Instant::now(), + channels: chs, + tgts: self.tgts.iter().enumerate().map(|x| x.0).collect(), + sids, + done: vec![], + }; + self.in_flight.insert(bid.clone(), batch); + self.batch_send_queue.push_back(bid); + } + + fn handle_result(&mut self, src: SocketAddrV4, res: Vec<(SearchId, SocketAddrV4)>) { + let mut sids_remove = vec![]; + for (sid, addr) in res { + self.sids_done.insert(sid.clone(), ()); + match self.bid_by_sid.get(&sid) { + Some(bid) => { + sids_remove.push(sid.clone()); + match self.in_flight.get_mut(bid) { + Some(batch) => { + for (i2, s2) in batch.sids.iter().enumerate() { + if s2 == &sid { + match batch.channels.get(i2) { + Some(ch) => { + let res = FindIocRes { + channel: ch.into(), + addr: Some(addr), + src: src.clone(), + }; + self.out_queue.push_back(res); + } + None => { + error!("no matching channel for {sid:?}"); + } + } + } + } + // Book keeping: + batch.done.push(sid); + let mut all_done = true; + if batch.done.len() >= batch.sids.len() { + for s1 in &batch.sids { + if !batch.done.contains(s1) { + all_done = false; + break; + } + } + } else { + all_done = false; + } + if all_done { + //info!("all searches done for {bid:?}"); + self.bids_all_done.insert(bid.clone(), ()); + self.in_flight.remove(bid); + } + } + None => { + // TODO analyze reasons + error!("no batch for {bid:?}"); + } + } + } + None => { + // TODO analyze reasons + if self.sids_done.contains_key(&sid) { + self.result_for_done_sid_count += 1; + } else { + error!("no bid for {sid:?}"); + } + } + } + } + for sid in sids_remove { + self.bid_by_sid.remove(&sid); + } + } + + fn clear_timed_out(&mut self) { + let now = Instant::now(); + let mut bids = vec![]; + let mut sids = vec![]; + let mut chns = vec![]; + for (bid, batch) in &mut self.in_flight { + if now.duration_since(batch.ts_beg) > self.batch_run_max { + for (i2, sid) in batch.sids.iter().enumerate() { + if batch.done.contains(sid) == false { + warn!("Timeout: {bid:?} {}", batch.channels[i2]); + } + sids.push(sid.clone()); + chns.push(batch.channels[i2].clone()); + } + bids.push(bid.clone()); + } + } + for (sid, ch) in sids.into_iter().zip(chns) { + let res = FindIocRes { + src: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0), + channel: ch, + addr: None, + }; + self.out_queue.push_back(res); + self.bid_by_sid.remove(&sid); + } + for bid in bids { + self.in_flight.remove(&bid); } - Ok(()) } } -impl Future for FindIoc { - // TODO use a dedicated type to indicate timeout. - type Output = Result, Error>; +impl Stream for FindIocStream { + type Item = Result, Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - loop { - match self.deadline.poll_unpin(cx) { - Ready(()) => { - break Ready(Ok(self.result.clone())); - } - Pending => {} + match self.ping.poll_unpin(cx) { + Ready(_) => { + self.ping = Box::pin(tokio::time::sleep(Duration::from_millis(200))); + cx.waker().wake_by_ref(); } - break match &mut self.state { - FindIocState::Init => match unsafe { Self::create_socket(&mut self) } { - Ok(()) => { - let afd = tokio::io::unix::AsyncFd::new(self.sock.0).expect("can not create AsyncFd"); - self.afd = Some(afd); - self.state = FindIocState::WaitWritable; + Pending => {} + } + self.clear_timed_out(); + loop { + let mut loop_again = false; + if self.out_queue.is_empty() == false { + let ret = std::mem::replace(&mut self.out_queue, VecDeque::new()); + break Ready(Some(Ok(ret))); + } + if !self.buf1.is_empty() { + match self.afd.poll_write_ready(cx) { + Ready(Ok(mut g)) => match unsafe { Self::try_send(self.sock.0, &self.send_addr, &self.buf1) } { + Ready(Ok(())) => { + self.buf1.clear(); + loop_again = true; + } + Ready(Err(e)) => { + error!("{e:?}"); + } + Pending => { + g.clear_ready(); + warn!("socket seemed ready for write, but is not"); + loop_again = true; + } + }, + Ready(Err(e)) => { + let e = Error::with_msg_no_trace(format!("{e:?}")); + error!("poll_write_ready {e:?}"); + } + Pending => {} + } + } + while self.buf1.is_empty() { + match self.batch_send_queue.pop_front() { + Some(bid) => { + match self.buf_and_batch(&bid) { + Some((buf1, batch)) => { + match batch.tgts.pop_front() { + Some(tgtix) => { + Self::serialize_batch(buf1, batch); + match self.tgts.get(tgtix) { + Some(tgt) => { + let tgt = tgt.clone(); + //info!("Serialize and queue {bid:?}"); + self.send_addr = tgt.clone(); + self.batch_send_queue.push_back(bid); + loop_again = true; + } + None => { + self.buf1.clear(); + self.batch_send_queue.push_back(bid); + loop_again = true; + error!("tgtix does not exist"); + } + } + } + None => { + //info!("Batch exhausted"); + loop_again = true; + } + } + } + None => { + if self.bids_all_done.contains_key(&bid) { + // TODO count events + } else { + info!("Batch {bid:?} seems already done"); + } + loop_again = true; + } + } + } + None => break, + } + } + while !self.channels_input.is_empty() && self.in_flight.len() < self.in_flight_max { + self.create_in_flight(); + loop_again = true; + } + break match self.afd.poll_read_ready(cx) { + Ready(Ok(mut g)) => match unsafe { Self::try_read(self.sock.0) } { + Ready(Ok((src, res))) => { + self.handle_result(src, res); continue; } - Err(e) => { - error!("can not create socket {e:?}"); - Ready(Err(e)) - } - }, - FindIocState::WaitWritable => match self.afd.as_mut().unwrap().poll_write_ready(cx) { - Ready(Ok(ref mut g)) => { - g.clear_ready(); - match unsafe { Self::try_write(&mut self) } { - Ok(()) => { - self.state = FindIocState::WaitReadable; - continue; - } - Err(e) => Ready(Err(e)), - } - } - Ready(Err(e)) => Ready(Err(e.into())), - Pending => Pending, - }, - FindIocState::WaitReadable => match self.afd.as_mut().unwrap().poll_read_ready(cx) { - Ready(Ok(ref mut g)) => { - g.clear_ready(); - match unsafe { Self::try_read(&mut self) } { - Ok(()) => { - continue; - } - Err(e) => Ready(Err(e)), - } - } Ready(Err(e)) => { - error!("WaitReadable Err"); - Ready(Err(e.into())) + error!("Error from try_read {e:?}"); + Ready(Some(Err(e))) + } + Pending => { + g.clear_ready(); + //warn!("socket seemed ready for read, but is not"); + continue; } - Pending => Pending, }, + Ready(Err(e)) => { + let e = Error::with_msg_no_trace(format!("{e:?}")); + error!("poll_read_ready {e:?}"); + Ready(Some(Err(e))) + } + Pending => { + if loop_again { + continue; + } else { + if self.channels_input.is_empty() && self.in_flight.is_empty() && self.out_queue.is_empty() { + Ready(None) + } else { + Pending + } + } + } }; } } } - -impl std::fmt::Debug for FindIoc { - fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - fmt.debug_struct("FindIoc").finish() - } -} diff --git a/netfetch/src/series.rs b/netfetch/src/series.rs index 0e8463a..43f0d88 100644 --- a/netfetch/src/series.rs +++ b/netfetch/src/series.rs @@ -53,10 +53,12 @@ pub async fn get_series_id(scy: &ScySession, cd: &ChannelDescDecoded) -> Result< let mut h = md5::Md5::new(); h.update(facility.as_bytes()); h.update(channel_name.as_bytes()); - h.update(format!("{:?}", scalar_type).as_bytes()); - h.update(format!("{:?}", shape).as_bytes()); + h.update(format!("{:?} {:?}", scalar_type, shape).as_bytes()); let f = h.finalize(); let mut series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); + // TODO technically we could/should assert that we run on 2-complement machine. + const SMASK: u64 = 0x7fffffffffffffff; + series = series & SMASK; for _ in 0..2000 { let res = scy .query( @@ -77,6 +79,7 @@ pub async fn get_series_id(scy: &ScySession, cd: &ChannelDescDecoded) -> Result< } tokio::time::sleep(Duration::from_millis(20)).await; series += 1; + series = series & SMASK; } Err(Error::with_msg_no_trace(format!("can not create and insert series id"))) } else {