WIP split channel

This commit is contained in:
Dominik Werder
2023-09-11 17:25:44 +02:00
parent 9d329c26ad
commit 6407af9574
10 changed files with 439 additions and 236 deletions

View File

@@ -4,6 +4,7 @@ pub mod inserthook;
use async_channel::Receiver;
use async_channel::Sender;
use async_channel::WeakReceiver;
use async_channel::WeakSender;
use err::Error;
use log::*;
use netfetch::ca::connset::CaConnSet;
@@ -45,6 +46,12 @@ const CHANNEL_CHECK_INTERVAL: Duration = Duration::from_millis(5000);
const PRINT_ACTIVE_INTERVAL: Duration = Duration::from_millis(60000);
const PRINT_STATUS_INTERVAL: Duration = Duration::from_millis(20000);
#[derive(Debug)]
enum CheckPeriodic {
Waiting(Instant),
Ongoing(Instant),
}
pub struct DaemonOpts {
backend: String,
local_epics_hostname: String,
@@ -78,30 +85,28 @@ pub struct Daemon {
count_assigned: usize,
last_status_print: SystemTime,
insert_workers_jh: Vec<JoinHandle<Result<(), Error>>>,
caconn_last_channel_check: Instant,
stats: Arc<DaemonStats>,
shutting_down: bool,
insert_rx_weak: WeakReceiver<QueryItem>,
connset_ctrl: CaConnSetCtrl,
connset_status_last: Instant,
connset_status_last: CheckPeriodic,
// TODO should be a stats object?
insert_workers_running: AtomicU64,
query_item_tx_weak: WeakSender<QueryItem>,
}
impl Daemon {
pub async fn new(opts: DaemonOpts) -> Result<Self, Error> {
let datastore = DataStore::new(&opts.scyconf)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let datastore = Arc::new(datastore);
let (daemon_ev_tx, daemon_ev_rx) = async_channel::bounded(32);
// TODO keep join handles and await later
let (channel_info_query_tx, ..) = dbpg::seriesbychannel::start_lookup_workers(4, &opts.pgconf)
let (channel_info_query_tx, jhs, jh) = dbpg::seriesbychannel::start_lookup_workers(4, &opts.pgconf)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let (query_item_tx, query_item_rx) = async_channel::bounded(opts.insert_item_queue_cap);
let query_item_tx_weak = query_item_tx.downgrade();
let insert_queue_counter = Arc::new(AtomicUsize::new(0));
// Insert queue hook
@@ -207,13 +212,13 @@ impl Daemon {
count_assigned: 0,
last_status_print: SystemTime::now(),
insert_workers_jh,
caconn_last_channel_check: Instant::now(),
stats: Arc::new(DaemonStats::new()),
shutting_down: false,
insert_rx_weak: query_item_rx.downgrade(),
connset_ctrl: conn_set_ctrl,
connset_status_last: Instant::now(),
connset_status_last: CheckPeriodic::Waiting(Instant::now()),
insert_workers_running: AtomicU64::new(0),
query_item_tx_weak,
};
Ok(ret)
}
@@ -222,10 +227,24 @@ impl Daemon {
&self.stats
}
async fn check_caconn_chans(&mut self) -> Result<(), Error> {
if self.caconn_last_channel_check.elapsed() > CHANNEL_CHECK_INTERVAL {
self.connset_ctrl.check_health().await?;
self.caconn_last_channel_check = Instant::now();
async fn check_caconn_chans(&mut self, ts1: Instant) -> Result<(), Error> {
match &self.connset_status_last {
CheckPeriodic::Waiting(since) => {
if *since + Duration::from_millis(2000) < ts1 {
debug!("======================================== issue health check CaConn");
self.connset_ctrl.check_health().await?;
self.connset_status_last = CheckPeriodic::Ongoing(ts1);
if let Some(tx) = self.query_item_tx_weak.upgrade() {
info!("query_item_tx len {}", tx.len());
}
}
}
CheckPeriodic::Ongoing(since) => {
let dt = ts1.saturating_duration_since(*since);
if dt > Duration::from_millis(4000) {
error!("======================================== CaConnSet has not reported health status since {:.0}", dt.as_secs_f32() * 1e3);
}
}
}
Ok(())
}
@@ -244,7 +263,6 @@ impl Daemon {
}
}
self.stats.handle_timer_tick_count.inc();
let ts1 = Instant::now();
let tsnow = SystemTime::now();
if SIGINT.load(atomic::Ordering::Acquire) == 1 {
warn!("Received SIGINT");
@@ -254,18 +272,8 @@ impl Daemon {
warn!("Received SIGTERM");
SIGTERM.store(2, atomic::Ordering::Release);
}
if self.connset_status_last + Duration::from_millis(2000) < ts1 {
self.connset_ctrl.check_health().await?;
}
if self.connset_status_last + Duration::from_millis(10000) < ts1 {
error!("CaConnSet has not reported health status");
}
let dt = ts1.elapsed();
if dt > Duration::from_millis(500) {
info!("slow check_chans {}ms", dt.as_secs_f32() * 1e3);
}
let ts1 = Instant::now();
self.check_caconn_chans().await?;
self.check_caconn_chans(ts1).await?;
let dt = ts1.elapsed();
if dt > Duration::from_millis(500) {
info!("slow check_chans {}ms", dt.as_secs_f32() * 1e3);
@@ -287,6 +295,7 @@ impl Daemon {
}
async fn handle_channel_add(&mut self, ch: Channel) -> Result<(), Error> {
debug!("handle_channel_add {ch:?}");
self.connset_ctrl
.add_channel(
self.opts.backend.clone(),
@@ -356,8 +365,20 @@ impl Daemon {
async fn handle_ca_conn_set_item(&mut self, item: CaConnSetItem) -> Result<(), Error> {
use CaConnSetItem::*;
match item {
Healthy => {
self.connset_status_last = Instant::now();
Healthy(ts1, ts2) => {
let ts3 = Instant::now();
let dt1 = ts2.duration_since(ts1).as_secs_f32() * 1e3;
let dt2 = ts3.duration_since(ts2).as_secs_f32() * 1e3;
match &self.connset_status_last {
CheckPeriodic::Waiting(_since) => {
error!("======================================== received CaConnSet health report without having asked {dt1:.0} ms {dt2:.0} ms");
}
CheckPeriodic::Ongoing(since) => {
let dtsince = ts3.duration_since(*since).as_secs_f32() * 1e3;
debug!("======================================== received CaConnSet healthy dtsince {dtsince:.0} ms {dt1:.0} ms {dt2:.0} ms");
self.connset_status_last = CheckPeriodic::Waiting(ts3);
}
}
}
}
Ok(())
@@ -404,7 +425,7 @@ impl Daemon {
let ts1 = Instant::now();
let ret = self.handle_timer_tick().await;
match tx.send(i.wrapping_add(1)).await {
Ok(_) => {}
Ok(()) => {}
Err(_) => {
self.stats.ticker_token_release_error.inc();
error!("can not send ticker token");
@@ -575,9 +596,16 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
};
let daemon_jh = taskrun::spawn(daemon.daemon());
debug!("will configure {} channels", channels.len());
let mut i = 0;
for s in &channels {
let ch = Channel::new(s.into());
tx.send(DaemonEvent::ChannelAdd(ch)).await?;
i += 1;
if i % 1000 == 0 {
debug!("sent {} ChannelAdd", i);
}
}
debug!("{} configured channels applied", channels.len());
daemon_jh.await.map_err(|e| Error::with_msg_no_trace(e.to_string()))??;

View File

@@ -1,6 +1,6 @@
use async_channel::Receiver;
use async_channel::SendError;
use async_channel::Sender;
use core::fmt;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
@@ -21,7 +21,7 @@ use tokio_postgres::Statement as PgStatement;
#[allow(unused)]
macro_rules! trace2 {
($($arg:tt)*) => {
if true {
if false {
trace!($($arg)*);
}
};
@@ -30,7 +30,7 @@ macro_rules! trace2 {
#[allow(unused)]
macro_rules! trace3 {
($($arg:tt)*) => {
if true {
if false {
trace!($($arg)*);
}
};
@@ -67,6 +67,17 @@ pub struct ChannelInfoQuery {
pub tx: Pin<Box<dyn CanSendChannelInfoResult + Send>>,
}
impl fmt::Debug for ChannelInfoQuery {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("ChannelInfoQuery")
.field("backend", &self.backend)
.field("channel", &self.channel)
.field("scalar_type", &self.scalar_type)
.field("shape_dims", &self.shape_dims)
.finish()
}
}
struct ChannelInfoResult2 {
backend: String,
channel: String,
@@ -160,24 +171,30 @@ impl Worker {
let mut it1 = rows.into_iter();
let mut e1 = it1.next();
for (qrid, tx) in tx {
if let Some(row) = &e1 {
let i = qrid as usize;
let found = if let Some(row) = &e1 {
let rid: i32 = row.get(1);
let channel: String = row.get(2);
if rid as u32 == qrid {
let series: i64 = row.get(0);
let ch2: String = row.get(2);
let series = SeriesId::new(series as _);
let res = ChannelInfoResult2 {
// TODO take from database query. Needs test.
backend: backend[0].clone(),
channel,
channel: ch2,
series: Existence::Existing(series),
tx,
};
result.push(res);
e1 = it1.next();
None
} else {
Some(tx)
}
e1 = it1.next();
} else {
let i = qrid as usize;
Some(tx)
};
if let Some(tx) = found {
let k = ChannelInfoQuery {
backend: backend[i].clone(),
channel: channel[i].clone(),
@@ -265,7 +282,7 @@ impl Worker {
async fn work(&mut self) -> Result<(), Error> {
while let Some(batch) = self.batch_rx.next().await {
trace2!("worker recv batch len {}", batch.len());
trace!("worker recv batch len {}", batch.len());
for x in &batch {
trace3!(
"search for {} {} {:?} {:?}",
@@ -277,19 +294,26 @@ impl Worker {
}
let (res1, missing) = self.select(batch).await?;
let res3 = if missing.len() > 0 {
trace2!("missing {}", missing.len());
for x in &missing {
trace2!("insert missing {x:?}");
}
let missing_count = missing.len();
self.insert_missing(&missing).await?;
let (res2, missing2) = self.select(missing).await?;
if missing2.len() > 0 {
warn!("series ids still missing after insert");
for x in &missing2 {
warn!("series ids still missing after insert {}", x.channel);
}
Err(Error::SeriesMissing)
} else {
Ok(res2)
trace2!("select missing after insert {} of {}", missing_count, res2.len());
Ok((res1, res2))
}
} else {
Ok(res1)
};
let res4 = res3?;
for r in res4 {
Ok((res1, Vec::new()))
}?;
for r in res3.0.into_iter().chain(res3.1.into_iter()) {
let item = ChannelInfoResult {
backend: r.backend,
channel: r.channel,
@@ -306,7 +330,7 @@ impl Worker {
}
}
}
info!("Worker done");
debug!("Worker done");
Ok(())
}
}

View File

@@ -1,5 +1,6 @@
pub mod conn;
pub mod connset;
pub mod connset_input_merge;
pub mod finder;
pub mod findioc;
pub mod proto;

View File

@@ -1,11 +1,5 @@
use super::proto;
use super::proto::CaItem;
use super::proto::CaMsg;
use super::proto::CaMsgTy;
use super::proto::CaProto;
use super::ExtraInsertsConf;
use crate::ca::proto::CreateChan;
use crate::ca::proto::EventAdd;
use crate::senderpolling::SenderPolling;
use crate::timebin::ConnTimeBin;
use async_channel::Sender;
@@ -23,6 +17,12 @@ use netpod::ScalarType;
use netpod::Shape;
use netpod::TS_MSP_GRID_SPACING;
use netpod::TS_MSP_GRID_UNIT;
use proto::CaItem;
use proto::CaMsg;
use proto::CaMsgTy;
use proto::CaProto;
use proto::CreateChan;
use proto::EventAdd;
use scywr::iteminsertqueue as scywriiq;
use scywriiq::ChannelInfoItem;
use scywriiq::ChannelStatus;
@@ -46,9 +46,7 @@ use std::ops::ControlFlow;
use std::pin::Pin;
use std::sync::atomic;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
@@ -57,10 +55,19 @@ use std::time::SystemTime;
use taskrun::tokio;
use tokio::net::TcpStream;
#[allow(unused)]
macro_rules! trace2 {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! trace3 {
($($arg:tt)*) => {
if true {
if false {
trace!($($arg)*);
}
};
@@ -69,7 +76,7 @@ macro_rules! trace3 {
#[allow(unused)]
macro_rules! trace4 {
($($arg:tt)*) => {
if true {
if false {
trace!($($arg)*);
}
};
@@ -394,12 +401,6 @@ pub struct CaConnEvent {
pub value: CaConnEventValue,
}
#[derive(Debug)]
enum ChannelSetOp {
Add(ChannelStatusSeriesId),
Remove,
}
struct SendSeriesLookup {
tx: Sender<ConnCommand>,
}
@@ -626,6 +627,7 @@ impl CaConn {
&mut self,
res: Result<ChannelInfoResult, dbpg::seriesbychannel::Error>,
) -> Result<(), Error> {
trace2!("handle_series_lookup_result {res:?}");
match res {
Ok(res) => {
let series = res.series.into_inner();
@@ -643,7 +645,7 @@ impl CaConn {
let data_type = st2.data_type;
let data_count = st2.data_count;
match self.channel_to_evented(cid, sid, data_type, data_count, series) {
Ok(_) => {}
Ok(()) => {}
Err(e) => {
error!("handle_series_lookup_result {e}");
}
@@ -668,38 +670,40 @@ impl CaConn {
fn handle_conn_command(&mut self, cx: &mut Context) -> Poll<Option<Result<(), Error>>> {
// TODO if this loops for too long time, yield and make sure we get wake up again.
use Poll::*;
self.stats.caconn_loop3_count.inc();
match self.conn_command_rx.poll_next_unpin(cx) {
Ready(Some(a)) => {
trace!("handle_conn_command received a command {}", self.remote_addr_dbg);
match a.kind {
ConnCommandKind::ChannelAdd(name, cssid) => {
self.cmd_channel_add(name, cssid);
Ready(Some(Ok(())))
loop {
self.stats.caconn_loop3_count.inc();
break match self.conn_command_rx.poll_next_unpin(cx) {
Ready(Some(a)) => {
trace3!("handle_conn_command received a command {}", self.remote_addr_dbg);
match a.kind {
ConnCommandKind::ChannelAdd(name, cssid) => {
self.cmd_channel_add(name, cssid);
Ready(Some(Ok(())))
}
ConnCommandKind::ChannelRemove(name) => {
self.cmd_channel_remove(name);
Ready(Some(Ok(())))
}
ConnCommandKind::CheckHealth => {
self.cmd_check_health();
Ready(Some(Ok(())))
}
ConnCommandKind::Shutdown => {
self.cmd_shutdown();
Ready(Some(Ok(())))
}
ConnCommandKind::SeriesLookupResult(x) => match self.handle_series_lookup_result(x) {
Ok(()) => Ready(Some(Ok(()))),
Err(e) => Ready(Some(Err(e))),
},
}
ConnCommandKind::ChannelRemove(name) => {
self.cmd_channel_remove(name);
Ready(Some(Ok(())))
}
ConnCommandKind::CheckHealth => {
self.cmd_check_health();
Ready(Some(Ok(())))
}
ConnCommandKind::Shutdown => {
self.cmd_shutdown();
Ready(Some(Ok(())))
}
ConnCommandKind::SeriesLookupResult(x) => match self.handle_series_lookup_result(x) {
Ok(()) => Ready(Some(Ok(()))),
Err(e) => Ready(Some(Err(e))),
},
}
}
Ready(None) => {
error!("Command queue closed");
Ready(None)
}
Pending => Pending,
Ready(None) => {
error!("Command queue closed");
Ready(None)
}
Pending => Pending,
};
}
}
@@ -937,22 +941,23 @@ impl CaConn {
series: SeriesId,
) -> Result<(), Error> {
let tsnow = Instant::now();
let name = self.name_by_cid(cid).unwrap().to_string();
// TODO handle error better! Transition channel to Error state?
let scalar_type = ScalarType::from_ca_id(data_type)?;
let shape = Shape::from_ca_count(data_count)?;
trace2!("channel_to_evented {name:?} {scalar_type:?} {shape:?}");
self.stats.get_series_id_ok.inc();
if series.id() == 0 {
warn!("Weird series id: {series:?}");
warn!("unexpected {series:?}");
}
if data_type > 6 {
error!("data type of series unexpected: {}", data_type);
}
// TODO handle error better! Transition channel to Error state?
let scalar_type = ScalarType::from_ca_id(data_type)?;
let shape = Shape::from_ca_count(data_count)?;
let mut tb = ConnTimeBin::empty();
tb.setup_for(series.clone(), &scalar_type, &shape)?;
self.time_binners.insert(cid, tb);
let subid = self.subid_store.next();
self.cid_by_subid.insert(subid, cid);
let name = self.name_by_cid(cid).unwrap().to_string();
// TODO convert first to CaDbrType, set to `Time`, then convert to ix:
let data_type_asked = data_type + 14;
let msg = CaMsg {
@@ -1170,10 +1175,12 @@ impl CaConn {
let item_queue = &mut self.insert_item_queue;
let inserts_counter = &mut self.inserts_counter;
let extra_inserts_conf = &self.extra_inserts_conf;
if let Some(tb) = self.time_binners.get_mut(&cid) {
tb.push(ts, &ev.value)?;
} else {
// TODO count or report error
if false {
if let Some(tb) = self.time_binners.get_mut(&cid) {
tb.push(ts, &ev.value)?;
} else {
// TODO count or report error
}
}
#[cfg(DISABLED)]
match &ev.value.data {
@@ -1383,10 +1390,7 @@ impl CaConn {
let sid = k.sid;
// TODO handle error:
let name = self.name_by_cid(cid).unwrap().to_string();
debug!("CreateChanRes {name:?}");
if false && name.contains(".STAT") {
info!("Channel created for {}", name);
}
trace3!("CreateChanRes {name:?}");
if k.data_type > 6 {
error!("CreateChanRes with unexpected data_type {}", k.data_type);
}
@@ -1438,7 +1442,7 @@ impl CaConn {
do_wake_again = true;
}
CaMsgTy::EventAddRes(k) => {
trace!("got EventAddRes: {k:?}");
trace4!("got EventAddRes: {k:?}");
self.stats.caconn_recv_data.inc();
let res = Self::handle_event_add_res(self, k, tsnow);
let ts2 = Instant::now();
@@ -1509,17 +1513,19 @@ impl CaConn {
Break(Pending)
}
fn handle_conn_state(&mut self, cx: &mut Context) -> Result<Option<Poll<()>>, Error> {
fn handle_conn_state(&mut self, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
use Poll::*;
match &mut self.state {
CaConnState::Unconnected => {
trace4!("Unconnected");
let addr = self.remote_addr_dbg.clone();
trace!("create tcp connection to {:?}", (addr.ip(), addr.port()));
let fut = tokio::time::timeout(Duration::from_millis(1000), TcpStream::connect(addr));
self.state = CaConnState::Connecting(addr, Box::pin(fut));
Ok(None)
Ok(Ready(Some(())))
}
CaConnState::Connecting(ref addr, ref mut fut) => {
trace4!("Connecting");
match fut.poll_unpin(cx) {
Ready(connect_result) => {
match connect_result {
@@ -1535,7 +1541,7 @@ impl CaConn {
let proto = CaProto::new(tcp, self.remote_addr_dbg.clone(), self.opts.array_truncate);
self.state = CaConnState::Init;
self.proto = Some(proto);
Ok(None)
Ok(Ready(Some(())))
}
Ok(Err(_e)) => {
// TODO log with exponential backoff
@@ -1549,7 +1555,7 @@ impl CaConn {
let dt = self.backoff_next();
self.state = CaConnState::Wait(wait_fut(dt));
self.proto = None;
Ok(None)
Ok(Ready(Some(())))
}
Err(e) => {
// TODO log with exponential backoff
@@ -1564,14 +1570,15 @@ impl CaConn {
let dt = self.backoff_next();
self.state = CaConnState::Wait(wait_fut(dt));
self.proto = None;
Ok(None)
Ok(Ready(Some(())))
}
}
}
Pending => Ok(Some(Pending)),
Pending => Ok(Pending),
}
}
CaConnState::Init => {
trace4!("Init");
let hostname = self.local_epics_hostname.clone();
let proto = self.proto.as_mut().unwrap();
let msg = CaMsg { ty: CaMsgTy::Version };
@@ -1585,54 +1592,74 @@ impl CaConn {
};
proto.push_out(msg);
self.state = CaConnState::Listen;
Ok(None)
Ok(Ready(Some(())))
}
CaConnState::Listen => {
trace4!("Listen");
match {
let res = self.handle_conn_listen(cx);
res
} {
Ready(Some(Ok(()))) => Ok(Ready(Some(()))),
Ready(Some(Err(e))) => Err(e),
Ready(None) => Ok(Ready(Some(()))),
Pending => Ok(Pending),
}
}
CaConnState::Listen => match {
let res = self.handle_conn_listen(cx);
res
} {
Ready(Some(Ok(()))) => Ok(Some(Ready(()))),
Ready(Some(Err(e))) => Err(e),
Ready(None) => Ok(None),
Pending => Ok(Some(Pending)),
},
CaConnState::PeerReady => {
trace4!("PeerReady");
let res = self.handle_peer_ready(cx);
match res {
Ready(Some(Ok(()))) => Ok(None),
Ready(Some(Ok(()))) => Ok(Ready(Some(()))),
Ready(Some(Err(e))) => Err(e),
Ready(None) => Ok(None),
Pending => Ok(Some(Pending)),
Ready(None) => Ok(Ready(Some(()))),
Pending => Ok(Pending),
}
}
CaConnState::Wait(inst) => match inst.poll_unpin(cx) {
Ready(_) => {
self.state = CaConnState::Unconnected;
self.proto = None;
Ok(None)
CaConnState::Wait(inst) => {
trace4!("Wait");
match inst.poll_unpin(cx) {
Ready(_) => {
self.state = CaConnState::Unconnected;
self.proto = None;
Ok(Ready(Some(())))
}
Pending => Ok(Pending),
}
Pending => Ok(Some(Pending)),
},
CaConnState::Shutdown => Ok(None),
CaConnState::EndOfStream => Ok(None),
}
CaConnState::Shutdown => {
trace4!("Shutdown");
Ok(Ready(None))
}
CaConnState::EndOfStream => {
trace4!("EndOfStream");
Ok(Ready(None))
}
}
}
fn loop_inner(&mut self, cx: &mut Context) -> Result<Option<Poll<()>>, Error> {
fn loop_inner(&mut self, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
use Poll::*;
loop {
self.stats.caconn_loop2_count.inc();
if self.is_shutdown() {
break Ok(None);
}
if self.insert_item_queue.len() >= self.opts.insert_queue_max {
break Ok(None);
}
match self.handle_conn_state(cx)? {
Some(Ready(_)) => continue,
Some(Pending) => break Ok(Some(Pending)),
None => break Ok(None),
}
break if self.is_shutdown() {
Ok(Ready(None))
} else if self.insert_item_queue.len() >= self.opts.insert_queue_max {
warn!("======================================================= queue stall");
Ok(Ready(None))
} else {
match self.handle_conn_state(cx) {
Ok(x) => match x {
Ready(Some(())) => continue,
Ready(None) => {
error!("handle_conn_state yields {x:?}");
Err(Error::with_msg_no_trace("logic error"))
}
Pending => Ok(Pending),
},
Err(e) => Err(e),
}
};
}
}
@@ -1662,9 +1689,11 @@ impl CaConn {
fn handle_own_ticker_tick(self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> {
let this = self.get_mut();
for (_, tb) in this.time_binners.iter_mut() {
let iiq = &mut this.insert_item_queue;
tb.tick(iiq)?;
if false {
for (_, tb) in this.time_binners.iter_mut() {
let iiq = &mut this.insert_item_queue;
tb.tick(iiq)?;
}
}
Ok(())
}
@@ -1680,10 +1709,11 @@ impl CaConn {
break if sd.is_sending() {
match sd.poll_unpin(cx) {
Ready(Ok(())) => continue,
Ready(Err(e)) => Err(Error::with_msg_no_trace("can not send into channel")),
Ready(Err(_)) => Err(Error::with_msg_no_trace("can not send into channel")),
Pending => Ok(()),
}
} else if let Some(item) = self.channel_info_query_queue.pop_front() {
trace3!("send series query {item:?}");
let sd = &mut self.channel_info_query_sending;
sd.send(item);
continue;
@@ -1700,8 +1730,8 @@ impl Stream for CaConn {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
self.stats.caconn_poll_count.inc();
loop {
let mut have_pending = false;
let poll_ts1 = Instant::now();
let ret = loop {
break if let CaConnState::EndOfStream = self.state {
Ready(None)
} else if let Err(e) = self.as_mut().handle_own_ticker(cx) {
@@ -1724,64 +1754,42 @@ impl Stream for CaConn {
Ready(Some(Err(e)))
} else if let Ready(Some(Err(e))) = self.as_mut().handle_conn_command(cx) {
Ready(Some(Err(e)))
} else if let Some(item) = {
match self.loop_inner(cx) {
// TODO what does this mean: should we re-loop or yield something?
Ok(Some(Ready(()))) => None,
// This is the last step, so we yield Pending.
// But in general, this does not compose well when we would add another step.
Ok(Some(Pending)) => {
have_pending = true;
None
}
Ok(None) => None,
Err(e) => Some(Err(e)),
}
} {
Ready(Some(item))
} else {
// Ready(_) => self.stats.conn_stream_ready.inc(),
// Pending => self.stats.conn_stream_pending.inc(),
let _item = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::None,
};
if self.is_shutdown() && self.queues_async_out_flushed() {
self.state = CaConnState::EndOfStream;
Ready(None)
} else if have_pending {
Pending
} else {
continue;
match self.loop_inner(cx) {
Ok(Ready(Some(()))) => continue,
Ok(Ready(None)) => {
// Ready(_) => self.stats.conn_stream_ready.inc(),
// Pending => self.stats.conn_stream_pending.inc(),
let _item = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::None,
};
if self.is_shutdown() && self.queues_async_out_flushed() {
debug!("end of stream {}", self.remote_addr_dbg);
self.state = CaConnState::EndOfStream;
Ready(None)
} else {
continue;
}
}
Ok(Pending) => Pending,
Err(e) => {
error!("{e}");
self.state = CaConnState::EndOfStream;
Ready(Some(Err(e)))
}
}
};
}
}
}
pub struct PollTimer<INP> {
inp: INP,
}
impl<INP> PollTimer<INP> {
pub fn new(inp: INP) -> Self {
Self { inp }
}
}
impl<INP> Stream for PollTimer<INP>
where
INP: Stream + Unpin,
{
type Item = <INP as Stream>::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let poll_ts1 = Instant::now();
let inp = &mut self.inp;
let ret = inp.poll_next_unpin(cx);
};
let poll_ts2 = Instant::now();
let dt = poll_ts2.saturating_duration_since(poll_ts1);
if dt > Duration::from_millis(40) {}
if dt > Duration::from_millis(80) {
warn!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3)
} else if dt > Duration::from_millis(40) {
info!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3)
} else if dt > Duration::from_millis(5) {
debug!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3)
}
ret
}
}

View File

@@ -1,3 +1,4 @@
use super::connset_input_merge::InputMerge;
use super::findioc::FindIocRes;
use super::statemap;
use super::statemap::ChannelState;
@@ -68,6 +69,33 @@ static SEARCH_REQ_RECV_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_REQ_BATCH_SEND_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_ANS_COUNT: AtomicUsize = AtomicUsize::new(0);
#[allow(unused)]
macro_rules! trace2 {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! trace3 {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! trace4 {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
#[derive(Debug, PartialEq, Eq)]
pub struct CmdId(SocketAddrV4, usize);
@@ -122,7 +150,7 @@ pub enum ConnSetCmd {
ChannelAddWithAddr(ChannelAddWithAddr),
ChannelRemove(ChannelRemove),
IocAddrQueryResult(VecDeque<FindIocRes>),
CheckHealth,
CheckHealth(Instant),
Shutdown,
}
@@ -134,7 +162,7 @@ pub enum CaConnSetEvent {
#[derive(Debug, Clone)]
pub enum CaConnSetItem {
Healthy,
Healthy(Instant, Instant),
}
pub struct CaConnSetCtrl {
@@ -173,7 +201,7 @@ impl CaConnSetCtrl {
}
pub async fn check_health(&self) -> Result<(), Error> {
let cmd = ConnSetCmd::CheckHealth;
let cmd = ConnSetCmd::CheckHealth(Instant::now());
self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?;
Ok(())
}
@@ -212,7 +240,8 @@ pub struct CaConnSet {
ca_conn_ress: BTreeMap<SocketAddr, CaConnRes>,
channel_states: ChannelStateMap,
connset_tx: Sender<CaConnSetEvent>,
connset_rx: Receiver<CaConnSetEvent>,
// connset_rx: Receiver<CaConnSetEvent>,
connset_rx: crate::ca::connset_input_merge::InputMerge,
channel_info_query_tx: Sender<ChannelInfoQuery>,
storage_insert_tx: Sender<QueryItem>,
shutdown_stopping: bool,
@@ -231,17 +260,20 @@ impl CaConnSet {
channel_info_query_tx: Sender<ChannelInfoQuery>,
pgconf: Database,
) -> CaConnSetCtrl {
let (connset_inp_tx, connset_inp_rx) = async_channel::bounded(256);
let (connset_out_tx, connset_out_rx) = async_channel::bounded(256);
let (connset_tx, connset_rx) = async_channel::bounded(10000);
let (search_tx, ioc_finder_jh) = super::finder::start_finder(connset_tx.clone(), backend.clone(), pgconf);
let (find_ioc_res_tx, find_ioc_res_rx) = async_channel::bounded(10000);
let (search_tx, ioc_finder_jh) = super::finder::start_finder(find_ioc_res_tx.clone(), backend.clone(), pgconf);
let input_merge = InputMerge::new(todo!(), find_ioc_res_rx);
let connset = Self {
backend,
local_epics_hostname,
search_tx,
ca_conn_ress: BTreeMap::new(),
channel_states: ChannelStateMap::new(),
connset_tx: connset_tx.clone(),
connset_rx,
connset_tx: connset_inp_tx,
// connset_rx: find_ioc_res_rx,
connset_rx: todo!(),
channel_info_query_tx,
storage_insert_tx,
shutdown_stopping: false,
@@ -254,7 +286,7 @@ impl CaConnSet {
// TODO await on jh
let jh = tokio::spawn(CaConnSet::run(connset));
CaConnSetCtrl {
tx: connset_tx,
tx: connset_inp_tx,
rx: connset_out_rx,
jh,
}
@@ -262,10 +294,10 @@ impl CaConnSet {
async fn run(mut this: CaConnSet) -> Result<(), Error> {
loop {
let x = this.connset_rx.recv().await;
let x = this.connset_rx.next().await;
match x {
Ok(ev) => this.handle_event(ev).await?,
Err(_) => {
Some(ev) => this.handle_event(ev).await?,
None => {
if this.shutdown_stopping {
// all fine
break;
@@ -302,7 +334,7 @@ impl CaConnSet {
ConnSetCmd::ChannelRemove(x) => self.handle_remove_channel(x).await,
ConnSetCmd::IocAddrQueryResult(x) => self.handle_ioc_query_result(x).await,
ConnSetCmd::SeriesLookupResult(x) => self.handle_series_lookup_result(x).await,
ConnSetCmd::CheckHealth => self.handle_check_health().await,
ConnSetCmd::CheckHealth(ts1) => self.handle_check_health(ts1).await,
ConnSetCmd::Shutdown => self.handle_shutdown().await,
},
CaConnSetEvent::CaConnEvent((addr, ev)) => match ev.value {
@@ -322,7 +354,7 @@ impl CaConnSet {
&mut self,
res: Result<ChannelInfoResult, dbpg::seriesbychannel::Error>,
) -> Result<(), Error> {
debug!("handle_series_lookup_result {res:?}");
trace3!("handle_series_lookup_result {res:?}");
match res {
Ok(res) => {
let add = ChannelAddWithStatusId {
@@ -372,7 +404,7 @@ impl CaConnSet {
debug!("handle_add_channel but shutdown_stopping");
return Ok(());
}
debug!("handle_add_channel_with_status_id {add:?}");
trace3!("handle_add_channel_with_status_id {add:?}");
let ch = Channel::new(add.name.clone());
if let Some(chst) = self.channel_states.inner().get_mut(&ch) {
if let ChannelStateValue::Active(chst2) = &mut chst.value {
@@ -462,7 +494,7 @@ impl CaConnSet {
} = ast
{
if let Some(addr) = e.addr {
debug!("ioc found {e:?}");
trace3!("ioc found {e:?}");
let add = ChannelAddWithAddr {
backend: self.backend.clone(),
name: e.channel,
@@ -496,9 +528,10 @@ impl CaConnSet {
Ok(())
}
async fn handle_check_health(&mut self) -> Result<(), Error> {
async fn handle_check_health(&mut self, ts1: Instant) -> Result<(), Error> {
debug!("TODO handle_check_health");
let item = CaConnSetItem::Healthy;
let ts2 = Instant::now();
let item = CaConnSetItem::Healthy(ts1, ts2);
self.connset_out_tx.send(item).await?;
Ok(())
}
@@ -907,7 +940,6 @@ impl CaConnSet {
}
}
}
use atomic::Ordering::Release;
self.stats.channel_unknown_address.__set(unknown_address);
self.stats.channel_search_pending.__set(search_pending);
self.stats.channel_no_address.__set(no_address);

View File

@@ -0,0 +1,66 @@
use super::connset::CaConnSetEvent;
use super::findioc::FindIocRes;
use crate::ca::connset::ConnSetCmd;
use async_channel::Receiver;
use futures_util::StreamExt;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
pub struct InputMerge {
inp1: Option<Receiver<CaConnSetEvent>>,
inp2: Option<Receiver<VecDeque<FindIocRes>>>,
}
impl InputMerge {
pub fn new(inp1: Receiver<CaConnSetEvent>, inp2: Receiver<VecDeque<FindIocRes>>) -> Self {
Self {
inp1: Some(inp1),
inp2: Some(inp2),
}
}
pub fn close(&mut self) {
if let Some(x) = self.inp1.as_ref() {
x.close();
}
}
}
impl futures_util::Stream for InputMerge {
type Item = CaConnSetEvent;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let mut poll_next = false;
let ret = if let Some(inp) = &mut self.inp2 {
match inp.poll_next_unpin(cx) {
Ready(Some(x)) => Some(CaConnSetEvent::ConnSetCmd(ConnSetCmd::IocAddrQueryResult(x))),
Ready(None) => {
self.inp2 = None;
None
}
Pending => None,
}
} else {
None
};
if let Some(x) = ret {
Ready(Some(x))
} else {
if let Some(inp) = &mut self.inp1 {
match inp.poll_next_unpin(cx) {
Ready(Some(x)) => Ready(Some(x)),
Ready(None) => {
self.inp1 = None;
Ready(None)
}
Pending => Pending,
}
} else {
Ready(None)
}
}
}
}

View File

@@ -85,7 +85,7 @@ fn transform_pgres(rows: Vec<PgRow>) -> VecDeque<FindIocRes> {
async fn finder_worker_single(
inp: Receiver<Vec<IocAddrQuery>>,
tx: Sender<CaConnSetEvent>,
tx: Sender<VecDeque<FindIocRes>>,
backend: String,
db: Database,
) -> Result<(), Error> {
@@ -164,11 +164,7 @@ async fn finder_worker_single(
error!("STILL NOT MATCHING LEN");
}
SEARCH_RES_3_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel);
let x = tx
.send(CaConnSetEvent::ConnSetCmd(
crate::ca::connset::ConnSetCmd::IocAddrQueryResult(items),
))
.await;
let x = tx.send(items).await;
match x {
Ok(_) => {}
Err(e) => {
@@ -191,7 +187,7 @@ async fn finder_worker_single(
async fn finder_worker(
qrx: Receiver<IocAddrQuery>,
tx: Sender<CaConnSetEvent>,
tx: Sender<VecDeque<FindIocRes>>,
backend: String,
db: Database,
) -> Result<(), Error> {
@@ -215,7 +211,7 @@ async fn finder_worker(
}
pub fn start_finder(
tx: Sender<CaConnSetEvent>,
tx: Sender<VecDeque<FindIocRes>>,
backend: String,
db: Database,
) -> (Sender<IocAddrQuery>, JoinHandle<Result<(), Error>>) {

View File

@@ -7,6 +7,7 @@ pub mod linuxhelper;
pub mod metrics;
pub mod netbuf;
pub mod patchcollect;
pub mod polltimer;
pub mod rt;
pub mod senderpolling;
#[cfg(test)]

38
netfetch/src/polltimer.rs Normal file
View File

@@ -0,0 +1,38 @@
use futures_util::Stream;
use futures_util::StreamExt;
use log::*;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
pub struct PollTimer<INP> {
inp: INP,
timeout_warn: Duration,
}
impl<INP> PollTimer<INP> {
pub fn new(inp: INP, timeout_warn: Duration) -> Self {
Self { inp, timeout_warn }
}
}
impl<INP> Stream for PollTimer<INP>
where
INP: Stream + Unpin,
{
type Item = <INP as Stream>::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let poll_ts1 = Instant::now();
let inp = &mut self.inp;
let ret = inp.poll_next_unpin(cx);
let poll_ts2 = Instant::now();
let dt = poll_ts2.saturating_duration_since(poll_ts1);
if dt > self.timeout_warn {
warn!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3)
}
ret
}
}

View File

@@ -26,6 +26,15 @@ use std::any::Any;
use std::collections::VecDeque;
use std::time::SystemTime;
#[allow(unused)]
macro_rules! trace2 {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
struct TickParams<'a> {
series: SeriesId,
acc: &'a mut Box<dyn Any + Send>,
@@ -77,7 +86,7 @@ impl ConnTimeBin {
match scalar_type {
I8 => {
type ST = i8;
info!("SCALAR {}", any::type_name::<ST>());
trace2!("SCALAR {}", any::type_name::<ST>());
let cont = Cont::<ST>::empty();
self.events_binner =
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
@@ -88,7 +97,7 @@ impl ConnTimeBin {
}
I16 => {
type ST = i16;
info!("SCALAR {}", std::any::type_name::<ST>());
trace2!("SCALAR {}", std::any::type_name::<ST>());
let cont = Cont::<ST>::empty();
self.events_binner =
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
@@ -99,7 +108,7 @@ impl ConnTimeBin {
}
I32 => {
type ST = i32;
info!("SCALAR {}", std::any::type_name::<ST>());
trace2!("SCALAR {}", std::any::type_name::<ST>());
let cont = Cont::<ST>::empty();
self.events_binner =
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
@@ -110,7 +119,7 @@ impl ConnTimeBin {
}
F32 => {
type ST = f32;
info!("SCALAR {}", std::any::type_name::<ST>());
trace2!("SCALAR {}", std::any::type_name::<ST>());
let cont = Cont::<ST>::empty();
self.events_binner =
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
@@ -121,7 +130,7 @@ impl ConnTimeBin {
}
F64 => {
type ST = f64;
info!("SCALAR {}", std::any::type_name::<ST>());
trace2!("SCALAR {}", std::any::type_name::<ST>());
let cont = Cont::<ST>::empty();
self.events_binner =
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));