WIP refactor

This commit is contained in:
Dominik Werder
2023-09-06 17:34:27 +02:00
parent 76915d5d82
commit ba9bb7e26c
13 changed files with 1028 additions and 891 deletions

View File

@@ -99,19 +99,16 @@ jobs:
# id: daqingest_version_set
# working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/target/release
- run: "echo 'version: [${{steps.daqingest_version_set.outputs.daqingest_version}}]'"
- run: "mkdir daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}"
- run: "cp ${{steps.wdset.outputs.gh}}/build/daqingest/target/release/daqingest daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}/daqingest"
- run: "tar -czf daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}.tar.gz daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}"
- run: "mkdir daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7"
- run: "cp ${{steps.wdset.outputs.gh}}/build/daqingest/target/release/daqingest daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7/daqingest"
- run: "tar -czf daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7.tar.gz daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7"
- uses: actions/upload-artifact@v3
with:
name: daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}
path: ${{steps.wdset.outputs.gh}}/build/daqingest/target/release/daqingest
- uses: actions/upload-artifact@v3
with:
name: daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}.tar.gz
path: daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}.tar.gz
- run: echo "{\"tag_name\":\"buildaction\", \"name\":\"daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}\", \"draft\":true, \"prerelease\":true}" > create-rel.json
- run: "curl -v -o rel.json -L -X POST -H content-type:application/json -H 'accept:application/vnd.github+json' -H 'authorization:bearer ${{secrets.github_token}}' -H 'x-github-api-version: 2022-11-28' -T create-rel.json https://api.github.com/repos/paulscherrerinstitute/daqingest/releases"
- run: "curl -v -o rel.json -L -X POST -H content-type:application/json -H 'accept:application/vnd.github+json' -H 'authorization:bearer ${{secrets.github_token}}' -H x-github-api-version:2022-11-28 -T create-rel.json https://api.github.com/repos/paulscherrerinstitute/daqingest/releases"
- run: cat rel.json
- run: "RELID=$(python -c 'import json; x=json.load(open(\"rel.json\")); print(x[\"id\"])') && curl -v -o relass.json -L -X POST -H content-type:application/octet-stream -H 'accept:application/vnd.github+json' -H 'authorization:Bearer ${{secrets.github_token}}' -H 'X-GitHub-Api-Version: 2022-11-28' -T ${{steps.wdset.outputs.gh}}/build/daqingest/target/release/daqingest https://uploads.github.com/repos/paulscherrerinstitute/daqingest/releases/$RELID/assets?name=daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7"
- run: "RELID=$(python -c 'import json; x=json.load(open(\"rel.json\")); print(x[\"id\"])') && curl -v -o relass.json -L -X POST -H content-type:application/octet-stream -H 'accept:application/vnd.github+json' -H 'authorization:Bearer ${{secrets.github_token}}' -H x-github-api-version:2022-11-28 -T ${{steps.wdset.outputs.gh}}/build/daqingest/target/release/daqingest https://uploads.github.com/repos/paulscherrerinstitute/daqingest/releases/$RELID/assets?name=daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7"
- run: "RELID=$(python -c 'import json; x=json.load(open(\"rel.json\")); print(x[\"id\"])') && curl -v -o relass.json -L -X POST -H content-type:application/octet-stream -H 'accept:application/vnd.github+json' -H 'authorization:Bearer ${{secrets.github_token}}' -H x-github-api-version:2022-11-28 -T daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7.tar.gz https://uploads.github.com/repos/paulscherrerinstitute/daqingest/releases/$RELID/assets?name=daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7.tar.gz"
- run: cat relass.json

View File

@@ -28,6 +28,5 @@ scywr = { path = "../scywr" }
dbpg = { path = "../dbpg" }
series = { path = "../series" }
netfetch = { path = "../netfetch" }
batchtools = { path = "../batchtools" }
ingest-bsread = { path = "../ingest-bsread" }
ingest-linux = { path = "../ingest-linux" }

View File

@@ -51,126 +51,12 @@ use tokio::task::JoinHandle;
use tracing::info_span;
use tracing::Instrument;
const SEARCH_BATCH_MAX: usize = 256;
const CURRENT_SEARCH_PENDING_MAX: usize = SEARCH_BATCH_MAX * 4;
const SEARCH_DB_PIPELINE_LEN: usize = 4;
const FINDER_JOB_QUEUE_LEN_MAX: usize = 10;
const FINDER_IN_FLIGHT_MAX: usize = 800;
const FINDER_BATCH_SIZE: usize = 8;
const CHECK_CHANS_PER_TICK: usize = 10000;
const CA_CONN_INSERT_QUEUE_MAX: usize = 256;
const CHANNEL_STATUS_DUMMY_SCALAR_TYPE: i32 = i32::MIN + 1;
const UNKNOWN_ADDRESS_STAY: Duration = Duration::from_millis(2000);
const NO_ADDRESS_STAY: Duration = Duration::from_millis(20000);
const SEARCH_PENDING_TIMEOUT: Duration = Duration::from_millis(30000);
const SEARCH_PENDING_TIMEOUT_WARN: Duration = Duration::from_millis(8000);
const FINDER_TIMEOUT: Duration = Duration::from_millis(100);
const CHANNEL_CHECK_INTERVAL: Duration = Duration::from_millis(5000);
const PRINT_ACTIVE_INTERVAL: Duration = Duration::from_millis(60000);
const PRINT_STATUS_INTERVAL: Duration = Duration::from_millis(20000);
const DO_ASSIGN_TO_CA_CONN: bool = true;
static SEARCH_REQ_MARK_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_REQ_SEND_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_REQ_RECV_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_REQ_BATCH_SEND_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_REQ_BATCH_RECV_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_RES_0_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_RES_1_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_RES_2_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_RES_3_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_ANS_COUNT: AtomicUsize = AtomicUsize::new(0);
#[derive(Clone, Debug, Serialize)]
pub enum ConnectionStateValue {
Unconnected,
Connected {
//#[serde(with = "serde_Instant")]
since: SystemTime,
},
}
#[derive(Clone, Debug, Serialize)]
pub struct ConnectionState {
//#[serde(with = "serde_Instant")]
updated: SystemTime,
value: ConnectionStateValue,
}
#[derive(Clone, Debug, Serialize)]
pub enum WithAddressState {
Unassigned {
//#[serde(with = "serde_Instant")]
assign_at: SystemTime,
},
Assigned(ConnectionState),
}
#[derive(Clone, Debug, Serialize)]
pub enum WithStatusSeriesIdStateInner {
UnknownAddress {
since: SystemTime,
},
SearchPending {
//#[serde(with = "serde_Instant")]
since: SystemTime,
did_send: bool,
},
WithAddress {
addr: SocketAddrV4,
state: WithAddressState,
},
NoAddress {
since: SystemTime,
},
}
#[derive(Clone, Debug, Serialize)]
pub struct WithStatusSeriesIdState {
inner: WithStatusSeriesIdStateInner,
}
#[derive(Clone, Debug)]
pub enum ActiveChannelState {
Init {
since: SystemTime,
},
WaitForStatusSeriesId {
since: SystemTime,
rx: Receiver<Result<Existence<SeriesId>, dbpg::seriesbychannel::Error>>,
},
WithStatusSeriesId {
status_series_id: ChannelStatusSeriesId,
state: WithStatusSeriesIdState,
},
}
#[derive(Debug)]
pub enum ChannelStateValue {
Active(ActiveChannelState),
ToRemove { addr: Option<SocketAddrV4> },
}
#[derive(Debug)]
pub struct ChannelState {
value: ChannelStateValue,
}
#[derive(Debug)]
pub enum CaConnStateValue {
Fresh,
HadFeedback,
Shutdown { since: Instant },
}
#[derive(Debug)]
pub struct CaConnState {
last_feedback: Instant,
value: CaConnStateValue,
}
pub struct DaemonOpts {
backend: String,
local_epics_hostname: String,
@@ -193,13 +79,8 @@ impl DaemonOpts {
pub struct Daemon {
opts: DaemonOpts,
connection_states: BTreeMap<SocketAddrV4, CaConnState>,
channel_states: BTreeMap<Channel, ChannelState>,
tx: Sender<DaemonEvent>,
rx: Receiver<DaemonEvent>,
chan_check_next: Option<Channel>,
search_tx: Sender<String>,
ioc_finder_jh: JoinHandle<Result<(), Error>>,
insert_queue_counter: Arc<AtomicUsize>,
count_unknown_address: usize,
count_search_pending: usize,
@@ -226,8 +107,6 @@ impl Daemon {
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let datastore = Arc::new(datastore);
let (daemon_ev_tx, daemon_ev_rx) = async_channel::bounded(32);
let (search_tx, ioc_finder_jh) =
finder::start_finder(daemon_ev_tx.clone(), opts.backend().into(), opts.pgconf.clone());
// TODO keep join handles and await later
let (channel_info_query_tx, ..) = dbpg::seriesbychannel::start_lookup_workers(4, &opts.pgconf)
@@ -242,8 +121,11 @@ impl Daemon {
let common_insert_item_queue_2 = rx;
let conn_set_ctrl = CaConnSet::start(
opts.backend.clone(),
opts.local_epics_hostname.clone(),
common_insert_item_queue.sender().unwrap().inner().clone(),
channel_info_query_tx.clone(),
opts.pgconf.clone(),
);
let ingest_commons = IngestCommons {
@@ -313,13 +195,8 @@ impl Daemon {
let ret = Self {
opts,
connection_states: BTreeMap::new(),
channel_states: BTreeMap::new(),
tx: daemon_ev_tx,
rx: daemon_ev_rx,
chan_check_next: None,
search_tx,
ioc_finder_jh,
insert_queue_counter,
count_unknown_address: 0,
count_search_pending: 0,
@@ -349,324 +226,6 @@ impl Daemon {
!self.shutting_down
}
fn check_connection_states(&mut self) -> Result<(), Error> {
let tsnow = Instant::now();
for (k, v) in &mut self.connection_states {
match v.value {
CaConnStateValue::Fresh => {
// TODO check for delta t since last issued status command.
if tsnow.duration_since(v.last_feedback) > Duration::from_millis(20000) {
error!("TODO Fresh timeout send connection-close for {k:?}");
self.stats.ca_conn_status_feedback_timeout_inc();
v.value = CaConnStateValue::Shutdown { since: tsnow };
}
}
CaConnStateValue::HadFeedback => {
// TODO check for delta t since last issued status command.
if tsnow.duration_since(v.last_feedback) > Duration::from_millis(20000) {
error!("TODO HadFeedback timeout send connection-close for {k:?}");
self.stats.ca_conn_status_feedback_timeout_inc();
v.value = CaConnStateValue::Shutdown { since: tsnow };
}
}
CaConnStateValue::Shutdown { since } => {
if tsnow.saturating_duration_since(since) > Duration::from_millis(10000) {
self.stats.critical_error_inc();
error!("Shutdown of CaConn to {} failed", k);
}
}
}
}
Ok(())
}
fn update_channel_state_counts(&mut self) -> (u64,) {
let mut unknown_address_count = 0;
let mut with_address_count = 0;
let mut search_pending_count = 0;
let mut no_address_count = 0;
for (_ch, st) in &self.channel_states {
match &st.value {
ChannelStateValue::Active(st2) => match st2 {
ActiveChannelState::Init { .. } => {
unknown_address_count += 1;
}
ActiveChannelState::WaitForStatusSeriesId { .. } => {
unknown_address_count += 1;
}
ActiveChannelState::WithStatusSeriesId { state, .. } => match &state.inner {
WithStatusSeriesIdStateInner::UnknownAddress { .. } => {
unknown_address_count += 1;
}
WithStatusSeriesIdStateInner::SearchPending { .. } => {
search_pending_count += 1;
}
WithStatusSeriesIdStateInner::WithAddress { state, .. } => match state {
WithAddressState::Unassigned { .. } => {
with_address_count += 1;
}
WithAddressState::Assigned(_) => {
with_address_count += 1;
}
},
WithStatusSeriesIdStateInner::NoAddress { .. } => {
no_address_count += 1;
}
},
},
ChannelStateValue::ToRemove { .. } => {
unknown_address_count += 1;
}
}
}
self.stats
.channel_unknown_address
.store(unknown_address_count, atomic::Ordering::Release);
self.stats
.channel_with_address
.store(with_address_count, atomic::Ordering::Release);
self.stats
.channel_search_pending
.store(search_pending_count, atomic::Ordering::Release);
self.stats
.channel_no_address
.store(no_address_count, atomic::Ordering::Release);
(search_pending_count,)
}
fn channel_state_counts_2(&mut self) {
self.count_unknown_address = 0;
self.count_search_pending = 0;
self.count_search_sent = 0;
self.count_no_address = 0;
self.count_unassigned = 0;
self.count_assigned = 0;
for (_ch, st) in &self.channel_states {
match &st.value {
ChannelStateValue::Active(st) => match st {
ActiveChannelState::Init { .. } => {}
ActiveChannelState::WaitForStatusSeriesId { .. } => {}
ActiveChannelState::WithStatusSeriesId { state, .. } => match &state.inner {
WithStatusSeriesIdStateInner::UnknownAddress { .. } => {
self.count_unknown_address += 1;
}
WithStatusSeriesIdStateInner::SearchPending { did_send, .. } => {
self.count_search_pending += 1;
if *did_send {
self.count_search_sent += 1;
}
}
WithStatusSeriesIdStateInner::WithAddress { state, .. } => match state {
WithAddressState::Unassigned { .. } => {
self.count_unassigned += 1;
}
WithAddressState::Assigned(_) => {
self.count_assigned += 1;
}
},
WithStatusSeriesIdStateInner::NoAddress { .. } => {
self.count_no_address += 1;
}
},
},
ChannelStateValue::ToRemove { .. } => {}
}
}
}
async fn check_channel_states(&mut self) -> Result<(), Error> {
let (mut search_pending_count,) = self.update_channel_state_counts();
let k = self.chan_check_next.take();
let it = if let Some(last) = k {
trace!("check_chans start at {:?}", last);
self.channel_states.range_mut(last..)
} else {
self.channel_states.range_mut(..)
};
let tsnow = SystemTime::now();
let mut attempt_series_search = true;
for (i, (ch, st)) in it.enumerate() {
match &mut st.value {
ChannelStateValue::Active(st2) => match st2 {
ActiveChannelState::Init { since: _ } => {
let (tx, rx) = async_channel::bounded(1);
let q = ChannelInfoQuery {
backend: self.ingest_commons.backend.clone(),
channel: ch.id().into(),
scalar_type: CHANNEL_STATUS_DUMMY_SCALAR_TYPE,
shape_dims: Vec::new(),
tx,
};
if attempt_series_search {
match self.channel_info_query_tx.try_send(q) {
Ok(()) => {
*st2 = ActiveChannelState::WaitForStatusSeriesId { since: tsnow, rx };
}
Err(e) => match e {
_ => {
attempt_series_search = false;
}
},
}
}
}
ActiveChannelState::WaitForStatusSeriesId { since, rx } => {
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > Duration::from_millis(5000) {
warn!("timeout can not get status series id for {ch:?}");
*st2 = ActiveChannelState::Init { since: tsnow };
} else {
match rx.try_recv() {
Ok(x) => match x {
Ok(x) => {
//info!("received status series id: {x:?}");
*st2 = ActiveChannelState::WithStatusSeriesId {
status_series_id: ChannelStatusSeriesId::new(x.into_inner().id()),
state: WithStatusSeriesIdState {
inner: WithStatusSeriesIdStateInner::UnknownAddress { since: tsnow },
},
};
}
Err(e) => {
error!("could not get a status series id {ch:?} {e}");
}
},
Err(_) => {
// TODO should maybe not attempt receive on each channel check.
}
}
}
}
ActiveChannelState::WithStatusSeriesId {
status_series_id,
state,
} => match &mut state.inner {
WithStatusSeriesIdStateInner::UnknownAddress { since } => {
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > UNKNOWN_ADDRESS_STAY {
//info!("UnknownAddress {} {:?}", i, ch);
if (search_pending_count as usize) < CURRENT_SEARCH_PENDING_MAX {
search_pending_count += 1;
state.inner = WithStatusSeriesIdStateInner::SearchPending {
since: tsnow,
did_send: false,
};
SEARCH_REQ_MARK_COUNT.fetch_add(1, atomic::Ordering::AcqRel);
}
}
}
WithStatusSeriesIdStateInner::SearchPending { since, did_send: _ } => {
//info!("SearchPending {} {:?}", i, ch);
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > SEARCH_PENDING_TIMEOUT {
info!("Search timeout for {ch:?}");
state.inner = WithStatusSeriesIdStateInner::NoAddress { since: tsnow };
search_pending_count -= 1;
}
}
WithStatusSeriesIdStateInner::WithAddress { addr, state } => {
//info!("WithAddress {} {:?}", i, ch);
use WithAddressState::*;
match state {
Unassigned { assign_at } => {
if DO_ASSIGN_TO_CA_CONN && *assign_at <= tsnow {
let backend = self.opts.backend().into();
let addr_v4 = SocketAddr::V4(*addr);
let name = ch.id().into();
let cssid = status_series_id.clone();
let local_epics_hostname = self.opts.local_epics_hostname.clone();
// This operation is meant to complete very quickly
self.connset_ctrl
.add_channel(backend, addr_v4, name, cssid, local_epics_hostname)
.slow_warn(500)
.instrument(info_span!("add_channel_to_addr"))
.await?;
let cs = ConnectionState {
updated: tsnow,
value: ConnectionStateValue::Unconnected,
};
*state = WithAddressState::Assigned(cs);
self.connection_states
.entry(*addr)
.and_modify(|_| {
// TODO may be count for metrics.
// Nothing else to do.
})
.or_insert_with(|| {
let t = CaConnState {
last_feedback: Instant::now(),
value: CaConnStateValue::Fresh,
};
t
});
// TODO move await out of here
if let Some(tx) = self.ingest_commons.insert_item_queue.sender() {
let item = QueryItem::ChannelStatus(ChannelStatusItem {
ts: tsnow,
series: SeriesId::new(status_series_id.id()),
status: ChannelStatus::AssignedToAddress,
});
match tx.send(item).await {
Ok(_) => {}
Err(_) => {
// TODO feed into throttled log, or count as unlogged
}
}
}
}
}
Assigned(_) => {
// TODO check if channel is healthy and alive
}
}
}
WithStatusSeriesIdStateInner::NoAddress { since } => {
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > NO_ADDRESS_STAY {
state.inner = WithStatusSeriesIdStateInner::UnknownAddress { since: tsnow };
}
}
},
},
ChannelStateValue::ToRemove { .. } => {
// TODO if assigned to some address,
}
}
if i >= CHECK_CHANS_PER_TICK {
self.chan_check_next = Some(ch.clone());
break;
}
}
for (ch, st) in &mut self.channel_states {
if let ChannelStateValue::Active(ActiveChannelState::WithStatusSeriesId {
status_series_id: _,
state,
}) = &mut st.value
{
if let WithStatusSeriesIdStateInner::SearchPending { since: _, did_send } = &mut state.inner {
if *did_send == false {
match self.search_tx.try_send(ch.id().into()) {
Ok(()) => {
*did_send = true;
SEARCH_REQ_SEND_COUNT.fetch_add(1, atomic::Ordering::AcqRel);
}
Err(e) => match e {
async_channel::TrySendError::Full(_) => {}
async_channel::TrySendError::Closed(_) => {
error!("Finder channel closed");
// TODO recover from this.
panic!();
}
},
}
}
}
}
}
self.channel_state_counts_2();
Ok(())
}
async fn check_caconn_chans(&mut self) -> Result<(), Error> {
if self.caconn_last_channel_check.elapsed() > CHANNEL_CHECK_INTERVAL {
self.connset_ctrl.check_health().await?;
@@ -709,8 +268,10 @@ impl Daemon {
warn!("Received SIGTERM");
SIGTERM.store(2, atomic::Ordering::Release);
}
self.check_connection_states()?;
self.check_channel_states().await?;
warn!("TODO let CaConnSet check health");
// TODO
// self.check_connection_states()?;
// self.check_channel_states().await?;
let dt = ts1.elapsed();
if dt > Duration::from_millis(500) {
info!("slow check_chans {}ms", dt.as_secs_f32() * 1e3);
@@ -733,38 +294,24 @@ impl Daemon {
self.count_assigned,
self.insert_queue_counter.load(atomic::Ordering::Acquire),
);
if false {
info!(
"{:5} {:5} {:5} {:5} {:5} {:5} {:5} {:5} {:5} {:5}",
SEARCH_REQ_MARK_COUNT.load(atomic::Ordering::Acquire),
SEARCH_REQ_SEND_COUNT.load(atomic::Ordering::Acquire),
SEARCH_REQ_RECV_COUNT.load(atomic::Ordering::Acquire),
SEARCH_REQ_BATCH_SEND_COUNT.load(atomic::Ordering::Acquire),
SEARCH_REQ_BATCH_RECV_COUNT.load(atomic::Ordering::Acquire),
SEARCH_RES_0_COUNT.load(atomic::Ordering::Acquire),
SEARCH_RES_1_COUNT.load(atomic::Ordering::Acquire),
SEARCH_RES_2_COUNT.load(atomic::Ordering::Acquire),
SEARCH_RES_3_COUNT.load(atomic::Ordering::Acquire),
SEARCH_ANS_COUNT.load(atomic::Ordering::Acquire),
);
}
}
Ok(())
}
fn handle_channel_add(&mut self, ch: Channel) -> Result<(), Error> {
if !self.channel_states.contains_key(&ch) {
let st = ChannelState {
value: ChannelStateValue::Active(ActiveChannelState::Init {
since: SystemTime::now(),
}),
};
self.channel_states.insert(ch, st);
}
async fn handle_channel_add(&mut self, ch: Channel) -> Result<(), Error> {
self.connset_ctrl
.add_channel(
self.opts.backend.clone(),
ch.id().into(),
self.opts.local_epics_hostname.clone(),
)
.await?;
Ok(())
}
fn handle_channel_remove(&mut self, ch: Channel) -> Result<(), Error> {
async fn handle_channel_remove(&mut self, ch: Channel) -> Result<(), Error> {
warn!("TODO handle_channel_remove");
#[cfg(DISABLED)]
if let Some(k) = self.channel_states.get_mut(&ch) {
match &k.value {
ChannelStateValue::Active(j) => match j {
@@ -801,9 +348,11 @@ impl Daemon {
}
async fn handle_search_done(&mut self, item: Result<VecDeque<FindIocRes>, Error>) -> Result<(), Error> {
warn!("TODO handle_search_done");
//debug!("handle SearchDone: {res:?}");
let allow_create_new_connections = self.allow_create_new_connections();
let tsnow = SystemTime::now();
// let allow_create_new_connections = self.allow_create_new_connections();
// let tsnow = SystemTime::now();
#[cfg(DISABLED)]
match item {
Ok(ress) => {
SEARCH_ANS_COUNT.fetch_add(ress.len(), atomic::Ordering::AcqRel);
@@ -916,6 +465,12 @@ impl Daemon {
Ok(())
}
async fn handle_ca_conn_done(&mut self, conn_addr: SocketAddrV4) -> Result<(), Error> {
warn!("TODO handle_ca_conn_done {conn_addr:?}");
Ok(())
}
#[cfg(DISABLED)]
async fn handle_ca_conn_done(&mut self, conn_addr: SocketAddrV4) -> Result<(), Error> {
info!("handle_ca_conn_done {conn_addr:?}");
self.connection_states.remove(&conn_addr);
@@ -984,6 +539,8 @@ impl Daemon {
use netfetch::ca::conn::ConnCommandResultKind::*;
match &item.kind {
CheckHealth => {
todo!("TODO collect the CaConn health check in CaConnSet");
#[cfg(DISABLED)]
if let Some(st) = self.connection_states.get_mut(&addr) {
self.stats.ca_conn_status_feedback_recv_inc();
st.last_feedback = Instant::now();
@@ -1006,6 +563,11 @@ impl Daemon {
}
}
async fn handle_shutdown(&mut self) -> Result<(), Error> {
todo!("handle_shutdown");
}
#[cfg(DISABLED)]
async fn handle_shutdown(&mut self) -> Result<(), Error> {
warn!("received shutdown event");
if self.shutting_down {
@@ -1041,8 +603,8 @@ impl Daemon {
let _ = ts1.elapsed();
ret
}
ChannelAdd(ch) => self.handle_channel_add(ch),
ChannelRemove(ch) => self.handle_channel_remove(ch),
ChannelAdd(ch) => self.handle_channel_add(ch).await,
ChannelRemove(ch) => self.handle_channel_remove(ch).await,
SearchDone(item) => self.handle_search_done(item).await,
CaConnEvent(addr, item) => self.handle_ca_conn_event(addr, item).await,
Shutdown => self.handle_shutdown().await,
@@ -1108,8 +670,6 @@ impl Daemon {
}
}
}
warn!("TODO wait for IOC finder properly");
let _ = &self.ioc_finder_jh;
warn!("TODO wait for insert workers");
let _ = &self.insert_workers_jh;
info!("daemon done");

View File

@@ -1,340 +1 @@
use super::FINDER_BATCH_SIZE;
use super::FINDER_IN_FLIGHT_MAX;
use super::FINDER_TIMEOUT;
use crate::daemon::CURRENT_SEARCH_PENDING_MAX;
use crate::daemon::FINDER_JOB_QUEUE_LEN_MAX;
use crate::daemon::SEARCH_BATCH_MAX;
use crate::daemon::SEARCH_DB_PIPELINE_LEN;
use crate::daemon::SEARCH_REQ_BATCH_RECV_COUNT;
use crate::daemon::SEARCH_RES_0_COUNT;
use crate::daemon::SEARCH_RES_1_COUNT;
use crate::daemon::SEARCH_RES_2_COUNT;
use crate::daemon::SEARCH_RES_3_COUNT;
use async_channel::Receiver;
use async_channel::Sender;
use dbpg::conn::make_pg_client;
use dbpg::postgres::Row as PgRow;
use err::Error;
use futures_util::FutureExt;
use futures_util::StreamExt;
use log::*;
use netfetch::ca::findioc::FindIocRes;
use netfetch::ca::findioc::FindIocStream;
use netfetch::daemon_common::DaemonEvent;
use netpod::Database;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::net::SocketAddrV4;
use std::sync::atomic;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
use tokio::task::JoinHandle;
#[allow(unused)]
macro_rules! debug_batch {
// (D$($arg:tt)*) => ();
($($arg:tt)*) => (if false {
debug!($($arg)*);
});
}
#[allow(unused)]
macro_rules! trace_batch {
// (D$($arg:tt)*) => ();
($($arg:tt)*) => (if false {
trace!($($arg)*);
});
}
fn transform_pgres(rows: Vec<PgRow>) -> VecDeque<FindIocRes> {
let mut ret = VecDeque::new();
for row in rows {
let ch: Result<String, _> = row.try_get(0);
if let Ok(ch) = ch {
if let Some(addr) = row.get::<_, Option<String>>(1) {
let addr = addr.parse().map_or(None, |x| Some(x));
let item = FindIocRes {
channel: ch,
response_addr: None,
addr,
dt: Duration::from_millis(0),
};
ret.push_back(item);
} else {
let item = FindIocRes {
channel: ch,
response_addr: None,
addr: None,
dt: Duration::from_millis(0),
};
ret.push_back(item);
}
} else if let Err(e) = ch {
error!("bad string from pg: {e:?}");
}
}
ret
}
async fn finder_worker_single(
inp: Receiver<Vec<String>>,
tx: Sender<DaemonEvent>,
backend: String,
db: Database,
) -> Result<(), Error> {
let pg = make_pg_client(&db)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let sql = concat!(
"with q1 as (select * from unnest($2::text[]) as unn (ch))",
" select distinct on (tt.facility, tt.channel) tt.channel, tt.addr",
" from ioc_by_channel_log tt join q1 on tt.channel = q1.ch and tt.facility = $1 and tt.addr is not null",
" order by tt.facility, tt.channel, tsmod desc",
);
let qu_select_multi = pg
.prepare(sql)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let mut resdiff = 0;
loop {
match inp.recv().await {
Ok(batch) => {
SEARCH_REQ_BATCH_RECV_COUNT.fetch_add(batch.len(), atomic::Ordering::AcqRel);
let ts1 = Instant::now();
debug_batch!("run query batch len {}", batch.len());
let qres = pg.query(&qu_select_multi, &[&backend, &batch]).await;
let dt = ts1.elapsed();
debug_batch!(
"done query batch len {}: {} {:.3}ms",
batch.len(),
qres.is_ok(),
dt.as_secs_f32() * 1e3
);
if dt > Duration::from_millis(5000) {
let mut out = String::from("[");
for s in &batch {
if out.len() > 1 {
out.push_str(", ");
}
out.push('\'');
out.push_str(s);
out.push('\'');
}
out.push(']');
eprintln!("VERY SLOW QUERY\n{out}");
}
match qres {
Ok(rows) => {
if rows.len() > batch.len() {
error!("MORE RESULTS THAN INPUT");
} else if rows.len() < batch.len() {
resdiff += batch.len() - rows.len();
}
let nbatch = batch.len();
trace_batch!("received results {} resdiff {}", rows.len(), resdiff);
SEARCH_RES_0_COUNT.fetch_add(rows.len(), atomic::Ordering::AcqRel);
let items = transform_pgres(rows);
let names: HashMap<_, _> = items.iter().map(|x| (&x.channel, true)).collect();
let mut to_add = Vec::new();
for s in batch {
if !names.contains_key(&s) {
let item = FindIocRes {
channel: s,
response_addr: None,
addr: None,
dt: Duration::from_millis(0),
};
to_add.push(item);
}
}
SEARCH_RES_1_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel);
SEARCH_RES_2_COUNT.fetch_add(to_add.len(), atomic::Ordering::AcqRel);
let mut items = items;
items.extend(to_add.into_iter());
if items.len() != nbatch {
error!("STILL NOT MATCHING LEN");
}
SEARCH_RES_3_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel);
let x = tx.send(DaemonEvent::SearchDone(Ok(items))).await;
match x {
Ok(_) => {}
Err(e) => {
error!("finder sees: {e}");
break;
}
}
}
Err(e) => {
error!("finder sees error: {e}");
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}
}
Err(_e) => break,
}
}
Ok(())
}
async fn finder_worker(
qrx: Receiver<String>,
tx: Sender<DaemonEvent>,
backend: String,
db: Database,
) -> Result<(), Error> {
// TODO do something with join handle
let (batch_rx, _jh) = batchtools::batcher::batch(
SEARCH_BATCH_MAX,
Duration::from_millis(200),
SEARCH_DB_PIPELINE_LEN,
qrx,
);
for _ in 0..SEARCH_DB_PIPELINE_LEN {
// TODO use join handle
tokio::spawn(finder_worker_single(
batch_rx.clone(),
tx.clone(),
backend.clone(),
db.clone(),
));
}
Ok(())
}
pub fn start_finder(
tx: Sender<DaemonEvent>,
backend: String,
db: Database,
) -> (Sender<String>, tokio::task::JoinHandle<Result<(), Error>>) {
let (qtx, qrx) = async_channel::bounded(CURRENT_SEARCH_PENDING_MAX);
let jh = taskrun::spawn(finder_worker(qrx, tx, backend, db));
(qtx, jh)
}
struct OptFut<F> {
fut: Option<F>,
}
impl<F> OptFut<F> {
fn empty() -> Self {
Self { fut: None }
}
fn new(fut: F) -> Self {
Self { fut: Some(fut) }
}
fn is_enabled(&self) -> bool {
self.fut.is_some()
}
}
impl<F> futures_util::Future for OptFut<F>
where
F: futures_util::Future + std::marker::Unpin,
{
type Output = <F as futures_util::Future>::Output;
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output> {
match self.fut.as_mut() {
Some(fut) => fut.poll_unpin(cx),
None => std::task::Poll::Pending,
}
}
}
#[allow(unused)]
fn start_finder_ca(tx: Sender<DaemonEvent>, tgts: Vec<SocketAddrV4>) -> (Sender<String>, JoinHandle<()>) {
let (qtx, qrx) = async_channel::bounded(32);
let (atx, arx) = async_channel::bounded(32);
let ioc_finder_fut = async move {
let mut finder = FindIocStream::new(tgts, FINDER_TIMEOUT, FINDER_IN_FLIGHT_MAX, FINDER_BATCH_SIZE);
let fut_tick_dur = Duration::from_millis(100);
let mut finder_more = true;
let mut finder_fut = OptFut::new(finder.next());
let mut qrx_fut = OptFut::new(qrx.recv());
let mut qrx_more = true;
let mut fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur));
let mut asend = OptFut::empty();
loop {
tokio::select! {
_ = &mut asend, if asend.is_enabled() => {
asend = OptFut::empty();
}
r1 = &mut finder_fut, if finder_fut.is_enabled() => {
finder_fut = OptFut::empty();
match r1 {
Some(item) => {
asend = OptFut::new(atx.send(item));
}
None => {
// TODO finder has stopped, do no longer poll on it
warn!("Finder has stopped");
finder_more = false;
}
}
if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX {
qrx_fut = OptFut::new(qrx.recv());
}
if finder_more {
finder_fut = OptFut::new(finder.next());
}
fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur));
}
r2 = &mut qrx_fut, if qrx_fut.is_enabled() => {
qrx_fut = OptFut::empty();
match r2 {
Ok(item) => {
finder.push(item);
}
Err(e) => {
// TODO input is done... ignore from here on.
error!("Finder input channel error {e}");
qrx_more = false;
}
}
if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX {
qrx_fut = OptFut::new(qrx.recv());
}
if finder_more {
finder_fut = OptFut::new(finder.next());
} else {
finder_fut = OptFut::empty();
}
fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur));
}
_ = &mut fut_tick => {
if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX {
qrx_fut = OptFut::new(qrx.recv());
}
if finder_more {
finder_fut = OptFut::new(finder.next());
} else {
finder_fut = OptFut::empty();
}
fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur));
}
else => {
error!("all branches are disabled");
break;
}
};
}
};
let ioc_finder_jh = taskrun::spawn(ioc_finder_fut);
taskrun::spawn({
async move {
while let Ok(item) = arx.recv().await {
match tx.send(DaemonEvent::SearchDone(item)).await {
Ok(_) => {}
Err(e) => {
error!("search res fwd {e}");
}
}
}
warn!("search res fwd nput broken");
}
});
(qtx, ioc_finder_jh)
}

View File

@@ -1,13 +1,16 @@
use async_channel::Receiver;
use async_channel::SendError;
use async_channel::Sender;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::StreamExt;
use log::*;
use md5::Digest;
use netpod::Database;
use series::series::Existence;
use series::SeriesId;
use std::pin::Pin;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
@@ -50,29 +53,23 @@ impl From<crate::err::Error> for Error {
}
}
pub type BoxedSend = Pin<Box<dyn Future<Output = Result<(), ()>> + Send>>;
pub trait CanSendChannelInfoResult: Sync {
fn make_send(&self, item: Result<Existence<SeriesId>, Error>) -> BoxedSend;
}
pub struct ChannelInfoQuery {
pub backend: String,
pub channel: String,
pub scalar_type: i32,
pub shape_dims: Vec<i32>,
pub tx: Sender<Result<Existence<SeriesId>, Error>>,
}
impl ChannelInfoQuery {
pub fn dummy(&self) -> Self {
Self {
backend: String::new(),
channel: String::new(),
scalar_type: -1,
shape_dims: Vec::new(),
tx: self.tx.clone(),
}
}
pub tx: Pin<Box<dyn CanSendChannelInfoResult + Send>>,
}
struct ChannelInfoResult {
series: Existence<SeriesId>,
tx: Sender<Result<Existence<SeriesId>, Error>>,
tx: Pin<Box<dyn CanSendChannelInfoResult + Send>>,
// only for trace:
channel: String,
}
@@ -285,7 +282,8 @@ impl Worker {
let res4 = res3?;
for r in res4 {
trace3!("try to send result for {} {:?}", r.channel, r.series);
match r.tx.send(Ok(r.series)).await {
let fut = r.tx.make_send(Ok(r.series));
match fut.await {
Ok(()) => {}
Err(_e) => {
warn!("can not deliver result");

View File

@@ -125,6 +125,10 @@ impl BsreadClient {
let evtset: Vec<_> = fr.data.iter().map(|&x| x != 0).collect();
let scalar_type = ScalarType::BOOL;
let shape = Shape::Wave(256);
if true {
todo!("TODO bsreadclient try to fetch series id");
}
#[cfg(DISABLED)]
if self.tmp_evtset_series.is_none() {
debug!("try to fetch series id");
let (tx, rx) = async_channel::bounded(8);

View File

@@ -43,3 +43,4 @@ items_2 = { path = "../../daqbuffer/crates/items_2" }
streams = { path = "../../daqbuffer/crates/streams" }
taskrun = { path = "../../daqbuffer/crates/taskrun" }
bitshuffle = { path = "../../daqbuffer/crates/bitshuffle" }
batchtools = { path = "../batchtools" }

View File

@@ -1,8 +1,10 @@
pub mod conn;
pub mod connset;
pub mod finder;
pub mod findioc;
pub mod proto;
pub mod search;
pub mod statemap;
use self::connset::CaConnSetCtrl;
use crate::ca::connset::CaConnSet;

View File

@@ -9,6 +9,7 @@ use crate::ca::proto::EventAdd;
use crate::senderpolling::SenderPolling;
use crate::timebin::ConnTimeBin;
use async_channel::Sender;
use dbpg::seriesbychannel::CanSendChannelInfoResult;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::Error;
use futures_util::stream::FuturesUnordered;
@@ -306,6 +307,7 @@ fn info_store_msp_from_time(ts: SystemTime) -> u32 {
#[derive(Debug)]
pub enum ConnCommandKind {
SeriesLookupResult(Result<Existence<SeriesId>, dbpg::seriesbychannel::Error>),
ChannelAdd(String, ChannelStatusSeriesId),
ChannelRemove(String),
CheckHealth,
@@ -319,6 +321,13 @@ pub struct ConnCommand {
}
impl ConnCommand {
pub fn series_lookup(qu: Result<Existence<SeriesId>, dbpg::seriesbychannel::Error>) -> Self {
Self {
id: Self::make_id(),
kind: ConnCommandKind::SeriesLookupResult(qu),
}
}
pub fn channel_add(name: String, cssid: ChannelStatusSeriesId) -> Self {
Self {
id: Self::make_id(),
@@ -389,6 +398,21 @@ enum ChannelSetOp {
Remove,
}
struct SendSeriesLookup {
tx: Sender<ConnCommand>,
}
impl CanSendChannelInfoResult for SendSeriesLookup {
fn make_send(
&self,
item: Result<Existence<SeriesId>, dbpg::seriesbychannel::Error>,
) -> dbpg::seriesbychannel::BoxedSend {
let tx = self.tx.clone();
let fut = async move { tx.send(ConnCommand::series_lookup(item)).await.map_err(|_| ()) };
Box::pin(fut)
}
}
struct ChannelOpsResources<'a> {
channel_set_ops: &'a StdMutex<BTreeMap<String, ChannelSetOp>>,
channels: &'a mut BTreeMap<Cid, ChannelState>,
@@ -639,6 +663,7 @@ impl CaConn {
self.cmd_shutdown();
Ready(Some(Ok(())))
}
ConnCommandKind::SeriesLookupResult(_) => todo!("TODO handle SeriesLookupResult"),
}
}
Ready(None) => {
@@ -962,24 +987,28 @@ impl CaConn {
let _ = cx;
loop {
break if let Some(mut entry) = self.series_lookup_schedule.first_entry() {
let dummy = entry.get().dummy();
let query = std::mem::replace(entry.get_mut(), dummy);
match self.channel_info_query_tx.try_send(query) {
Ok(()) => {
entry.remove();
continue;
}
Err(e) => match e {
async_channel::TrySendError::Full(_) => {
warn!("series lookup channel full");
*entry.get_mut() = e.into_inner();
}
async_channel::TrySendError::Closed(_) => {
warn!("series lookup channel closed");
// *entry.get_mut() = e.into_inner();
todo!("emit_series_lookup");
#[cfg(DISABLED)]
{
let dummy = entry.get().dummy();
let query = std::mem::replace(entry.get_mut(), dummy);
match self.channel_info_query_tx.try_send(query) {
Ok(()) => {
entry.remove();
continue;
}
},
Err(e) => match e {
async_channel::TrySendError::Full(_) => {
warn!("series lookup channel full");
*entry.get_mut() = e.into_inner();
}
async_channel::TrySendError::Closed(_) => {
warn!("series lookup channel closed");
// *entry.get_mut() = e.into_inner();
entry.remove();
}
},
}
}
} else {
()
@@ -1439,29 +1468,32 @@ impl CaConn {
*ch_s = ChannelState::FetchingSeriesId(created_state);
// TODO handle error in different way. Should most likely not abort.
if !self.series_lookup_schedule.contains_key(&cid) {
let (tx, rx) = async_channel::bounded(1);
let tx = SendSeriesLookup {
tx: self.conn_command_tx.clone(),
};
let query = ChannelInfoQuery {
backend: self.backend.clone(),
channel: name.clone(),
scalar_type: scalar_type.to_scylla_i32(),
shape_dims: shape.to_scylla_vec(),
tx,
tx: Box::pin(tx),
};
self.series_lookup_schedule.insert(cid, query);
let fut = async move {
match rx.recv().await {
Ok(item) => match item {
Ok(item) => Ok((cid, sid, k.data_type, k.data_count, item)),
Err(e) => Err(Error::with_msg_no_trace(e.to_string())),
},
Err(e) => {
// TODO count only
error!("can not receive series lookup result for {name} {e}");
Err(Error::with_msg_no_trace("can not receive lookup result"))
}
}
};
self.series_lookup_futs.push(Box::pin(fut));
todo!("TODO discover the series lookup from main command queue");
// let fut = async move {
// match rx.recv().await {
// Ok(item) => match item {
// Ok(item) => Ok((cid, sid, k.data_type, k.data_count, item)),
// Err(e) => Err(Error::with_msg_no_trace(e.to_string())),
// },
// Err(e) => {
// // TODO count only
// error!("can not receive series lookup result for {name} {e}");
// Err(Error::with_msg_no_trace("can not receive lookup result"))
// }
// }
// };
// self.series_lookup_futs.push(Box::pin(fut));
} else {
// TODO count only
warn!("series lookup for {name} already in progress");

View File

@@ -1,34 +1,77 @@
use super::conn::CaConnEvent;
use super::conn::ConnCommand;
use super::findioc::FindIocRes;
use super::statemap;
use super::statemap::ChannelState;
use crate::ca::conn::CaConn;
use crate::ca::conn::CaConnEvent;
use crate::ca::conn::CaConnEventValue;
use crate::ca::conn::CaConnOpts;
use crate::ca::conn::ConnCommand;
use crate::ca::statemap::CaConnState;
use crate::ca::statemap::ConnectionState;
use crate::ca::statemap::ConnectionStateValue;
use crate::ca::statemap::WithAddressState;
use crate::daemon_common::Channel;
use crate::errconv::ErrConv;
use crate::rt::JoinHandle;
use crate::rt::TokMx;
use async_channel::Receiver;
use async_channel::Sender;
use atomic::AtomicUsize;
use dbpg::seriesbychannel::BoxedSend;
use dbpg::seriesbychannel::CanSendChannelInfoResult;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::Error;
use futures_util::FutureExt;
use futures_util::StreamExt;
use netpod::log::*;
use log::*;
use netpod::Database;
use netpod::Shape;
use scywr::iteminsertqueue::ChannelStatusItem;
use scywr::iteminsertqueue::QueryItem;
use series::series::Existence;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use statemap::ActiveChannelState;
use statemap::CaConnStateValue;
use statemap::ChannelStateMap;
use statemap::ChannelStateValue;
use statemap::WithStatusSeriesIdState;
use statemap::WithStatusSeriesIdStateInner;
use statemap::CHANNEL_STATUS_DUMMY_SCALAR_TYPE;
use stats::CaConnSetStats;
use stats::CaConnStats;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::net::SocketAddrV4;
use std::sync::atomic;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use taskrun::tokio;
const DO_ASSIGN_TO_CA_CONN: bool = true;
const CHECK_CHANS_PER_TICK: usize = 10000;
pub const SEARCH_BATCH_MAX: usize = 256;
pub const CURRENT_SEARCH_PENDING_MAX: usize = SEARCH_BATCH_MAX * 4;
const UNKNOWN_ADDRESS_STAY: Duration = Duration::from_millis(2000);
const NO_ADDRESS_STAY: Duration = Duration::from_millis(20000);
const SEARCH_PENDING_TIMEOUT: Duration = Duration::from_millis(30000);
const SEARCH_PENDING_TIMEOUT_WARN: Duration = Duration::from_millis(8000);
// TODO put all these into metrics
static SEARCH_REQ_MARK_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_REQ_SEND_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_REQ_RECV_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_REQ_BATCH_SEND_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_ANS_COUNT: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug, PartialEq, Eq)]
pub struct CmdId(SocketAddrV4, usize);
pub struct CaConnRes {
state: CaConnState,
sender: Sender<ConnCommand>,
stats: Arc<CaConnStats>,
// TODO await on jh
@@ -41,18 +84,37 @@ impl CaConnRes {
}
}
#[derive(Debug, Clone)]
pub struct ChannelAddWithAddr {
backend: String,
name: String,
local_epics_hostname: String,
cssid: ChannelStatusSeriesId,
addr: SocketAddr,
}
#[derive(Debug, Clone)]
pub struct ChannelAddWithStatusId {
backend: String,
name: String,
local_epics_hostname: String,
cssid: ChannelStatusSeriesId,
}
#[derive(Debug, Clone)]
pub struct ChannelAdd {
backend: String,
name: String,
addr: SocketAddr,
cssid: ChannelStatusSeriesId,
local_epics_hostname: String,
}
#[derive(Debug)]
pub enum ConnSetCmd {
SeriesLookupResult(Result<Existence<SeriesId>, dbpg::seriesbychannel::Error>),
ChannelAdd(ChannelAdd),
ChannelAddWithStatusId(ChannelAddWithStatusId),
ChannelAddWithAddr(ChannelAddWithAddr),
IocAddrQueryResult(VecDeque<FindIocRes>),
CheckHealth,
Shutdown,
}
@@ -69,19 +131,10 @@ pub struct CaConnSetCtrl {
}
impl CaConnSetCtrl {
pub async fn add_channel(
&self,
backend: String,
addr: SocketAddr,
name: String,
cssid: ChannelStatusSeriesId,
local_epics_hostname: String,
) -> Result<(), Error> {
pub async fn add_channel(&self, backend: String, name: String, local_epics_hostname: String) -> Result<(), Error> {
let cmd = ChannelAdd {
backend,
name,
addr,
cssid,
local_epics_hostname,
};
let cmd = ConnSetCmd::ChannelAdd(cmd);
@@ -102,28 +155,65 @@ impl CaConnSetCtrl {
}
}
#[derive(Debug)]
pub struct IocAddrQuery {
pub name: String,
}
struct SeriesLookupSender {
tx: Sender<CaConnSetEvent>,
}
impl CanSendChannelInfoResult for SeriesLookupSender {
fn make_send(&self, item: Result<Existence<SeriesId>, dbpg::seriesbychannel::Error>) -> BoxedSend {
let tx = self.tx.clone();
let fut = async move {
tx.send(CaConnSetEvent::ConnSetCmd(ConnSetCmd::SeriesLookupResult(item)))
.await
.map_err(|_| ())
};
Box::pin(fut)
}
}
pub struct CaConnSet {
backend: String,
local_epics_hostname: String,
search_tx: Sender<IocAddrQuery>,
ca_conn_ress: BTreeMap<SocketAddr, CaConnRes>,
channel_states: ChannelStateMap,
connset_tx: Sender<CaConnSetEvent>,
connset_rx: Receiver<CaConnSetEvent>,
channel_info_query_tx: Sender<ChannelInfoQuery>,
storage_insert_tx: Sender<QueryItem>,
shutdown: bool,
chan_check_next: Option<Channel>,
stats: CaConnSetStats,
}
impl CaConnSet {
pub fn start(
backend: String,
local_epics_hostname: String,
storage_insert_tx: Sender<QueryItem>,
channel_info_query_tx: Sender<ChannelInfoQuery>,
pgconf: Database,
) -> CaConnSetCtrl {
let (connset_tx, connset_rx) = async_channel::bounded(10000);
let (search_tx, ioc_finder_jh) = super::finder::start_finder(connset_tx.clone(), backend.clone(), pgconf);
let connset = Self {
backend,
local_epics_hostname,
search_tx,
ca_conn_ress: BTreeMap::new(),
channel_states: ChannelStateMap::new(),
connset_tx: connset_tx.clone(),
connset_rx,
channel_info_query_tx,
storage_insert_tx,
shutdown: false,
chan_check_next: None,
stats: CaConnSetStats::new(),
};
// TODO await on jh
let jh = tokio::spawn(CaConnSet::run(connset));
@@ -150,7 +240,40 @@ impl CaConnSet {
async fn handle_event(&mut self, ev: CaConnSetEvent) -> Result<(), Error> {
match ev {
CaConnSetEvent::ConnSetCmd(cmd) => match cmd {
ConnSetCmd::ChannelAdd(x) => self.add_channel_to_addr(x).await,
ConnSetCmd::ChannelAdd(x) => self.handle_add_channel(x).await,
ConnSetCmd::ChannelAddWithStatusId(x) => self.handle_add_channel_with_status_id(x).await,
ConnSetCmd::IocAddrQueryResult(res) => {
for e in res {
if let Some(addr) = e.addr {
let ch = Channel::new(e.channel.clone());
if let Some(chst) = self.channel_states.inner().get(&ch) {
if let ChannelStateValue::Active(ast) = &chst.value {
if let ActiveChannelState::WithStatusSeriesId {
status_series_id,
state,
} = ast
{
let add = ChannelAddWithAddr {
backend: self.backend.clone(),
name: e.channel,
addr: SocketAddr::V4(addr),
cssid: status_series_id.clone(),
local_epics_hostname: self.local_epics_hostname.clone(),
};
} else {
warn!("TODO got address but no longer active");
}
} else {
warn!("TODO got address but no longer active");
}
} else {
warn!("ioc addr lookup done but channel no longer here");
}
}
}
Ok(())
}
ConnSetCmd::ChannelAddWithAddr(x) => self.handle_add_channel_with_addr(x).await,
ConnSetCmd::CheckHealth => {
error!("TODO implement check health");
Ok(())
@@ -160,6 +283,7 @@ impl CaConnSet {
self.shutdown = true;
Ok(())
}
ConnSetCmd::SeriesLookupResult(_) => todo!(),
},
CaConnSetEvent::CaConnEvent((addr, ev)) => match ev.value {
CaConnEventValue::None => Ok(()),
@@ -174,7 +298,53 @@ impl CaConnSet {
}
}
async fn add_channel_to_addr(&mut self, add: ChannelAdd) -> Result<(), Error> {
async fn handle_add_channel(&mut self, add: ChannelAdd) -> Result<(), Error> {
// TODO should I add the transition through ActiveChannelState::Init as well?
let ch = Channel::new(add.name.clone());
let _st = self.channel_states.inner().entry(ch).or_insert_with(|| ChannelState {
value: ChannelStateValue::Active(ActiveChannelState::WaitForStatusSeriesId {
since: SystemTime::now(),
}),
});
let item = ChannelInfoQuery {
backend: add.backend,
channel: add.name,
scalar_type: CHANNEL_STATUS_DUMMY_SCALAR_TYPE,
shape_dims: Vec::new(),
tx: Box::pin(SeriesLookupSender {
tx: self.connset_tx.clone(),
}),
};
self.channel_info_query_tx.send(item).await?;
Ok(())
}
async fn handle_add_channel_with_status_id(&mut self, add: ChannelAddWithStatusId) -> Result<(), Error> {
let ch = Channel::new(add.name.clone());
if let Some(chst) = self.channel_states.inner().get_mut(&ch) {
if let ChannelStateValue::Active(chst2) = &mut chst.value {
if let ActiveChannelState::WaitForStatusSeriesId { .. } = chst2 {
*chst2 = ActiveChannelState::WithStatusSeriesId {
status_series_id: add.cssid,
state: WithStatusSeriesIdState {
inner: WithStatusSeriesIdStateInner::NoAddress {
since: SystemTime::now(),
},
},
};
} else {
warn!("TODO have a status series id but no more channel");
}
} else {
warn!("TODO have a status series id but no more channel");
}
} else {
warn!("TODO have a status series id but no more channel");
}
Ok(())
}
async fn handle_add_channel_with_addr(&mut self, add: ChannelAddWithAddr) -> Result<(), Error> {
if !self.ca_conn_ress.contains_key(&add.addr) {
let c = self.create_ca_conn(add.clone())?;
self.ca_conn_ress.insert(add.addr, c);
@@ -185,7 +355,7 @@ impl CaConnSet {
Ok(())
}
fn create_ca_conn(&self, add: ChannelAdd) -> Result<CaConnRes, Error> {
fn create_ca_conn(&self, add: ChannelAddWithAddr) -> Result<CaConnRes, Error> {
// TODO should we save this as event?
let opts = CaConnOpts::default();
let addr = add.addr;
@@ -207,6 +377,7 @@ impl CaConnSet {
let conn_item_tx = self.connset_tx.clone();
let jh = tokio::spawn(Self::ca_conn_item_merge(conn, conn_item_tx, addr_v4));
let ca_conn_res = CaConnRes {
state: CaConnState::new(CaConnStateValue::Fresh),
sender: conn_tx,
stats: conn_stats,
jh,
@@ -354,4 +525,221 @@ impl CaConnSet {
Ok(false)
}
}
fn check_connection_states(&mut self) -> Result<(), Error> {
let tsnow = Instant::now();
for (addr, val) in &mut self.ca_conn_ress {
let state = &mut val.state;
let v = &mut state.value;
match v {
CaConnStateValue::Fresh => {
// TODO check for delta t since last issued status command.
if tsnow.duration_since(state.last_feedback) > Duration::from_millis(20000) {
error!("TODO Fresh timeout send connection-close for {addr}");
// TODO collect in metrics
// self.stats.ca_conn_status_feedback_timeout_inc();
// TODO send shutdown to this CaConn, check that we've received
// a 'shutdown' state from it. (see below)
*v = CaConnStateValue::Shutdown { since: tsnow };
}
}
CaConnStateValue::HadFeedback => {
// TODO check for delta t since last issued status command.
if tsnow.duration_since(state.last_feedback) > Duration::from_millis(20000) {
error!("TODO HadFeedback timeout send connection-close for {addr}");
// TODO collect in metrics
// self.stats.ca_conn_status_feedback_timeout_inc();
*v = CaConnStateValue::Shutdown { since: tsnow };
}
}
CaConnStateValue::Shutdown { since } => {
if tsnow.saturating_duration_since(*since) > Duration::from_millis(10000) {
// TODO collect in metrics as severe error, this would be a bug.
// self.stats.critical_error_inc();
error!("Shutdown of CaConn failed for {addr}");
}
}
}
}
Ok(())
}
async fn check_channel_states(&mut self) -> Result<(), Error> {
let (mut search_pending_count,) = self.update_channel_state_counts();
let k = self.chan_check_next.take();
let it = if let Some(last) = k {
trace!("check_chans start at {:?}", last);
self.channel_states.inner().range_mut(last..)
} else {
self.channel_states.inner().range_mut(..)
};
let tsnow = SystemTime::now();
let mut attempt_series_search = true;
for (i, (ch, st)) in it.enumerate() {
match &mut st.value {
ChannelStateValue::Active(st2) => match st2 {
ActiveChannelState::Init { since: _ } => {
todo!()
}
ActiveChannelState::WaitForStatusSeriesId { since } => {
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > Duration::from_millis(5000) {
warn!("timeout can not get status series id for {ch:?}");
*st2 = ActiveChannelState::Init { since: tsnow };
} else {
// TODO
}
}
ActiveChannelState::WithStatusSeriesId {
status_series_id,
state,
} => match &mut state.inner {
WithStatusSeriesIdStateInner::UnknownAddress { since } => {
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > UNKNOWN_ADDRESS_STAY {
//info!("UnknownAddress {} {:?}", i, ch);
if (search_pending_count as usize) < CURRENT_SEARCH_PENDING_MAX {
search_pending_count += 1;
state.inner = WithStatusSeriesIdStateInner::SearchPending {
since: tsnow,
did_send: false,
};
SEARCH_REQ_MARK_COUNT.fetch_add(1, atomic::Ordering::AcqRel);
}
}
}
WithStatusSeriesIdStateInner::SearchPending { since, did_send: _ } => {
//info!("SearchPending {} {:?}", i, ch);
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > SEARCH_PENDING_TIMEOUT {
info!("Search timeout for {ch:?}");
state.inner = WithStatusSeriesIdStateInner::NoAddress { since: tsnow };
search_pending_count -= 1;
}
}
WithStatusSeriesIdStateInner::WithAddress { addr: addr_v4, state } => {
//info!("WithAddress {} {:?}", i, ch);
use WithAddressState::*;
match state {
Unassigned { assign_at } => {
// TODO do I need this case anymore?
#[cfg(DISABLED)]
if DO_ASSIGN_TO_CA_CONN && *assign_at <= tsnow {
let backend = self.backend.clone();
let addr = SocketAddr::V4(*addr_v4);
let name = ch.id().into();
let cssid = status_series_id.clone();
let local_epics_hostname = self.local_epics_hostname.clone();
// This operation is meant to complete very quickly
let add = ChannelAdd {
backend: backend,
name: name,
addr,
cssid,
local_epics_hostname,
};
self.handle_add_channel(add).await?;
let cs = ConnectionState {
updated: tsnow,
value: ConnectionStateValue::Unconnected,
};
// TODO if a matching CaConn does not yet exist, it gets created
// via the command through the channel, so we can not await it here.
// Therefore, would be good to have a separate status entry out of
// the ca_conn_ress right here in a sync fashion.
*state = WithAddressState::Assigned(cs);
let item = QueryItem::ChannelStatus(ChannelStatusItem {
ts: tsnow,
series: SeriesId::new(status_series_id.id()),
status: scywr::iteminsertqueue::ChannelStatus::AssignedToAddress,
});
match self.storage_insert_tx.send(item).await {
Ok(_) => {}
Err(_) => {
// TODO feed into throttled log, or count as unlogged
}
}
}
}
Assigned(_) => {
// TODO check if channel is healthy and alive
}
}
}
WithStatusSeriesIdStateInner::NoAddress { since } => {
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > NO_ADDRESS_STAY {
state.inner = WithStatusSeriesIdStateInner::UnknownAddress { since: tsnow };
}
}
},
},
ChannelStateValue::ToRemove { .. } => {
// TODO if assigned to some address,
}
}
if i >= CHECK_CHANS_PER_TICK {
self.chan_check_next = Some(ch.clone());
break;
}
}
Ok(())
}
fn update_channel_state_counts(&mut self) -> (u64,) {
let mut unknown_address_count = 0;
let mut search_pending_count = 0;
let mut search_pending_did_send_count = 0;
let mut unassigned_count = 0;
let mut assigned_count = 0;
let mut no_address_count = 0;
for (_ch, st) in self.channel_states.inner().iter() {
match &st.value {
ChannelStateValue::Active(st2) => match st2 {
ActiveChannelState::Init { .. } => {
unknown_address_count += 1;
}
ActiveChannelState::WaitForStatusSeriesId { .. } => {
unknown_address_count += 1;
}
ActiveChannelState::WithStatusSeriesId { state, .. } => match &state.inner {
WithStatusSeriesIdStateInner::UnknownAddress { .. } => {
unknown_address_count += 1;
}
WithStatusSeriesIdStateInner::SearchPending { did_send, .. } => {
if *did_send {
search_pending_did_send_count += 1;
} else {
search_pending_count += 1;
}
}
WithStatusSeriesIdStateInner::WithAddress { state, .. } => match state {
WithAddressState::Unassigned { .. } => {
unassigned_count += 1;
}
WithAddressState::Assigned(_) => {
assigned_count += 1;
}
},
WithStatusSeriesIdStateInner::NoAddress { .. } => {
no_address_count += 1;
}
},
},
ChannelStateValue::ToRemove { .. } => {
unknown_address_count += 1;
}
}
}
use atomic::Ordering::Release;
self.stats.channel_unknown_address.store(unknown_address_count, Release);
self.stats.channel_search_pending.store(search_pending_count, Release);
self.stats
.search_pending_did_send
.store(search_pending_did_send_count, Release);
self.stats.unassigned.store(unassigned_count, Release);
self.stats.assigned.store(assigned_count, Release);
self.stats.channel_no_address.store(no_address_count, Release);
(search_pending_count,)
}
}

355
netfetch/src/ca/finder.rs Normal file
View File

@@ -0,0 +1,355 @@
use super::connset::CaConnSetEvent;
use super::connset::IocAddrQuery;
use super::connset::CURRENT_SEARCH_PENDING_MAX;
use super::connset::SEARCH_BATCH_MAX;
use crate::ca::findioc::FindIocRes;
use crate::ca::findioc::FindIocStream;
use crate::daemon_common::DaemonEvent;
use async_channel::Receiver;
use async_channel::Sender;
use dbpg::conn::make_pg_client;
use dbpg::postgres::Row as PgRow;
use err::Error;
use futures_util::FutureExt;
use futures_util::StreamExt;
use log::*;
use netpod::Database;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::net::SocketAddrV4;
use std::sync::atomic;
use std::sync::atomic::AtomicUsize;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
use tokio::task::JoinHandle;
const SEARCH_DB_PIPELINE_LEN: usize = 4;
const FINDER_JOB_QUEUE_LEN_MAX: usize = 10;
const FINDER_BATCH_SIZE: usize = 8;
const FINDER_IN_FLIGHT_MAX: usize = 800;
const FINDER_TIMEOUT: Duration = Duration::from_millis(100);
// TODO pull out into a stats
static SEARCH_REQ_BATCH_RECV_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_RES_0_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_RES_1_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_RES_2_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_RES_3_COUNT: AtomicUsize = AtomicUsize::new(0);
#[allow(unused)]
macro_rules! debug_batch {
// (D$($arg:tt)*) => ();
($($arg:tt)*) => (if false {
debug!($($arg)*);
});
}
#[allow(unused)]
macro_rules! trace_batch {
// (D$($arg:tt)*) => ();
($($arg:tt)*) => (if false {
trace!($($arg)*);
});
}
#[derive(Debug)]
pub struct IocAddrQueryResult {}
fn transform_pgres(rows: Vec<PgRow>) -> VecDeque<FindIocRes> {
let mut ret = VecDeque::new();
for row in rows {
let ch: Result<String, _> = row.try_get(0);
if let Ok(ch) = ch {
if let Some(addr) = row.get::<_, Option<String>>(1) {
let addr = addr.parse().map_or(None, |x| Some(x));
let item = FindIocRes {
channel: ch,
response_addr: None,
addr,
dt: Duration::from_millis(0),
};
ret.push_back(item);
} else {
let item = FindIocRes {
channel: ch,
response_addr: None,
addr: None,
dt: Duration::from_millis(0),
};
ret.push_back(item);
}
} else if let Err(e) = ch {
error!("bad string from pg: {e:?}");
}
}
ret
}
async fn finder_worker_single(
inp: Receiver<Vec<IocAddrQuery>>,
tx: Sender<CaConnSetEvent>,
backend: String,
db: Database,
) -> Result<(), Error> {
let pg = make_pg_client(&db)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let sql = concat!(
"with q1 as (select * from unnest($2::text[]) as unn (ch))",
" select distinct on (tt.facility, tt.channel) tt.channel, tt.addr",
" from ioc_by_channel_log tt join q1 on tt.channel = q1.ch and tt.facility = $1 and tt.addr is not null",
" order by tt.facility, tt.channel, tsmod desc",
);
let qu_select_multi = pg
.prepare(sql)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let mut resdiff = 0;
loop {
match inp.recv().await {
Ok(batch) => {
SEARCH_REQ_BATCH_RECV_COUNT.fetch_add(batch.len(), atomic::Ordering::AcqRel);
let ts1 = Instant::now();
debug_batch!("run query batch len {}", batch.len());
let names: Vec<_> = batch.iter().map(|x| x.name.as_str()).collect();
let qres = pg.query(&qu_select_multi, &[&backend, &names]).await;
let dt = ts1.elapsed();
debug_batch!(
"done query batch len {}: {} {:.3}ms",
batch.len(),
qres.is_ok(),
dt.as_secs_f32() * 1e3
);
if dt > Duration::from_millis(5000) {
let mut out = String::from("[");
for e in &batch {
if out.len() > 1 {
out.push_str(", ");
}
out.push('\'');
out.push_str(&e.name);
out.push('\'');
}
out.push(']');
eprintln!("VERY SLOW QUERY\n{out}");
}
match qres {
Ok(rows) => {
if rows.len() > batch.len() {
error!("MORE RESULTS THAN INPUT");
} else if rows.len() < batch.len() {
resdiff += batch.len() - rows.len();
}
let nbatch = batch.len();
trace_batch!("received results {} resdiff {}", rows.len(), resdiff);
SEARCH_RES_0_COUNT.fetch_add(rows.len(), atomic::Ordering::AcqRel);
let items = transform_pgres(rows);
let names: HashMap<_, _> = items.iter().map(|x| (&x.channel, true)).collect();
let mut to_add = Vec::new();
for e in batch {
let s = e.name;
if !names.contains_key(&s) {
let item = FindIocRes {
channel: s,
response_addr: None,
addr: None,
dt: Duration::from_millis(0),
};
to_add.push(item);
}
}
SEARCH_RES_1_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel);
SEARCH_RES_2_COUNT.fetch_add(to_add.len(), atomic::Ordering::AcqRel);
let mut items = items;
items.extend(to_add.into_iter());
if items.len() != nbatch {
error!("STILL NOT MATCHING LEN");
}
SEARCH_RES_3_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel);
let x = tx
.send(CaConnSetEvent::ConnSetCmd(
crate::ca::connset::ConnSetCmd::IocAddrQueryResult(items),
))
.await;
match x {
Ok(_) => {}
Err(e) => {
error!("finder sees: {e}");
break;
}
}
}
Err(e) => {
error!("finder sees error: {e}");
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}
}
Err(_e) => break,
}
}
Ok(())
}
async fn finder_worker(
qrx: Receiver<IocAddrQuery>,
tx: Sender<CaConnSetEvent>,
backend: String,
db: Database,
) -> Result<(), Error> {
// TODO do something with join handle
let (batch_rx, _jh) = batchtools::batcher::batch(
SEARCH_BATCH_MAX,
Duration::from_millis(200),
SEARCH_DB_PIPELINE_LEN,
qrx,
);
for _ in 0..SEARCH_DB_PIPELINE_LEN {
// TODO use join handle
tokio::spawn(finder_worker_single(
batch_rx.clone(),
tx.clone(),
backend.clone(),
db.clone(),
));
}
Ok(())
}
pub fn start_finder(
tx: Sender<CaConnSetEvent>,
backend: String,
db: Database,
) -> (Sender<IocAddrQuery>, JoinHandle<Result<(), Error>>) {
let (qtx, qrx) = async_channel::bounded(CURRENT_SEARCH_PENDING_MAX);
let jh = taskrun::spawn(finder_worker(qrx, tx, backend, db));
(qtx, jh)
}
struct OptFut<F> {
fut: Option<F>,
}
impl<F> OptFut<F> {
fn empty() -> Self {
Self { fut: None }
}
fn new(fut: F) -> Self {
Self { fut: Some(fut) }
}
fn is_enabled(&self) -> bool {
self.fut.is_some()
}
}
impl<F> futures_util::Future for OptFut<F>
where
F: futures_util::Future + std::marker::Unpin,
{
type Output = <F as futures_util::Future>::Output;
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output> {
match self.fut.as_mut() {
Some(fut) => fut.poll_unpin(cx),
None => std::task::Poll::Pending,
}
}
}
#[allow(unused)]
fn start_finder_ca(tx: Sender<DaemonEvent>, tgts: Vec<SocketAddrV4>) -> (Sender<String>, JoinHandle<()>) {
let (qtx, qrx) = async_channel::bounded(32);
let (atx, arx) = async_channel::bounded(32);
let ioc_finder_fut = async move {
let mut finder = FindIocStream::new(tgts, FINDER_TIMEOUT, FINDER_IN_FLIGHT_MAX, FINDER_BATCH_SIZE);
let fut_tick_dur = Duration::from_millis(100);
let mut finder_more = true;
let mut finder_fut = OptFut::new(finder.next());
let mut qrx_fut = OptFut::new(qrx.recv());
let mut qrx_more = true;
let mut fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur));
let mut asend = OptFut::empty();
loop {
tokio::select! {
_ = &mut asend, if asend.is_enabled() => {
asend = OptFut::empty();
}
r1 = &mut finder_fut, if finder_fut.is_enabled() => {
finder_fut = OptFut::empty();
match r1 {
Some(item) => {
asend = OptFut::new(atx.send(item));
}
None => {
// TODO finder has stopped, do no longer poll on it
warn!("Finder has stopped");
finder_more = false;
}
}
if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX {
qrx_fut = OptFut::new(qrx.recv());
}
if finder_more {
finder_fut = OptFut::new(finder.next());
}
fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur));
}
r2 = &mut qrx_fut, if qrx_fut.is_enabled() => {
qrx_fut = OptFut::empty();
match r2 {
Ok(item) => {
finder.push(item);
}
Err(e) => {
// TODO input is done... ignore from here on.
error!("Finder input channel error {e}");
qrx_more = false;
}
}
if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX {
qrx_fut = OptFut::new(qrx.recv());
}
if finder_more {
finder_fut = OptFut::new(finder.next());
} else {
finder_fut = OptFut::empty();
}
fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur));
}
_ = &mut fut_tick => {
if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX {
qrx_fut = OptFut::new(qrx.recv());
}
if finder_more {
finder_fut = OptFut::new(finder.next());
} else {
finder_fut = OptFut::empty();
}
fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur));
}
else => {
error!("all branches are disabled");
break;
}
};
}
};
let ioc_finder_jh = taskrun::spawn(ioc_finder_fut);
taskrun::spawn({
async move {
while let Ok(item) = arx.recv().await {
match tx.send(DaemonEvent::SearchDone(item)).await {
Ok(_) => {}
Err(e) => {
error!("search res fwd {e}");
}
}
}
warn!("search res fwd nput broken");
}
});
(qtx, ioc_finder_jh)
}

123
netfetch/src/ca/statemap.rs Normal file
View File

@@ -0,0 +1,123 @@
use crate::daemon_common::Channel;
use async_channel::Receiver;
use serde::Serialize;
use series::series::Existence;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use std::collections::BTreeMap;
use std::net::SocketAddrV4;
use std::time::Instant;
use std::time::SystemTime;
pub const CHANNEL_STATUS_DUMMY_SCALAR_TYPE: i32 = i32::MIN + 1;
#[derive(Debug)]
pub enum CaConnStateValue {
Fresh,
HadFeedback,
Shutdown { since: Instant },
}
#[derive(Debug)]
pub struct CaConnState {
pub last_feedback: Instant,
pub value: CaConnStateValue,
}
impl CaConnState {
pub fn new(value: CaConnStateValue) -> Self {
Self {
last_feedback: Instant::now(),
value,
}
}
}
#[derive(Clone, Debug, Serialize)]
pub enum ConnectionStateValue {
Unconnected,
Connected {
//#[serde(with = "serde_Instant")]
since: SystemTime,
},
}
#[derive(Clone, Debug, Serialize)]
pub struct ConnectionState {
//#[serde(with = "serde_Instant")]
pub updated: SystemTime,
pub value: ConnectionStateValue,
}
#[derive(Clone, Debug, Serialize)]
pub enum WithAddressState {
Unassigned {
//#[serde(with = "serde_Instant")]
assign_at: SystemTime,
},
Assigned(ConnectionState),
}
#[derive(Clone, Debug, Serialize)]
pub enum WithStatusSeriesIdStateInner {
UnknownAddress {
since: SystemTime,
},
SearchPending {
//#[serde(with = "serde_Instant")]
since: SystemTime,
did_send: bool,
},
WithAddress {
addr: SocketAddrV4,
state: WithAddressState,
},
NoAddress {
since: SystemTime,
},
}
#[derive(Clone, Debug, Serialize)]
pub struct WithStatusSeriesIdState {
pub inner: WithStatusSeriesIdStateInner,
}
#[derive(Clone, Debug)]
pub enum ActiveChannelState {
Init {
since: SystemTime,
},
WaitForStatusSeriesId {
since: SystemTime,
},
WithStatusSeriesId {
status_series_id: ChannelStatusSeriesId,
state: WithStatusSeriesIdState,
},
}
#[derive(Debug)]
pub enum ChannelStateValue {
Active(ActiveChannelState),
ToRemove { addr: Option<SocketAddrV4> },
}
#[derive(Debug)]
pub struct ChannelState {
pub value: ChannelStateValue,
}
#[derive(Debug)]
pub struct ChannelStateMap {
map: BTreeMap<Channel, ChannelState>,
}
impl ChannelStateMap {
pub fn new() -> Self {
Self { map: BTreeMap::new() }
}
pub fn inner(&mut self) -> &mut BTreeMap<Channel, ChannelState> {
&mut self.map
}
}

View File

@@ -207,6 +207,23 @@ impl IntervalEma {
}
}
stats_proc::stats_struct!((
stats_struct(
name(CaConnSetStats),
counters(
channel_unknown_address,
channel_with_address,
channel_search_pending,
search_pending_did_send,
channel_no_address,
unassigned,
assigned,
),
),
// agg(name(CaConnSetStatsAgg), parent(CaConnSetStats)),
diff(name(CaConnSetStatsDiff), input(CaConnSetStats)),
));
stats_proc::stats_struct!((
stats_struct(
name(CaConnStats),