More detailed channel config
This commit is contained in:
@@ -55,9 +55,8 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
|
||||
}
|
||||
ChannelAccess::CaIngest(k) => {
|
||||
info!("daqingest version {}", clap::crate_version!());
|
||||
let (conf, channels) = parse_config(k.config.into()).await?;
|
||||
todo!();
|
||||
// daqingest::daemon::run(conf, channels).await?
|
||||
let (conf, channels_config) = parse_config(k.config.into()).await?;
|
||||
daqingest::daemon::run(conf, channels_config).await?
|
||||
}
|
||||
},
|
||||
#[cfg(feature = "bsread")]
|
||||
|
||||
@@ -10,6 +10,8 @@ use netfetch::ca::connset::CaConnSet;
|
||||
use netfetch::ca::connset::CaConnSetCtrl;
|
||||
use netfetch::ca::connset::CaConnSetItem;
|
||||
use netfetch::conf::CaIngestOpts;
|
||||
use netfetch::conf::ChannelConfig;
|
||||
use netfetch::conf::ChannelsConfig;
|
||||
use netfetch::daemon_common::Channel;
|
||||
use netfetch::daemon_common::DaemonEvent;
|
||||
use netfetch::metrics::StatsSet;
|
||||
@@ -276,9 +278,13 @@ impl Daemon {
|
||||
}
|
||||
self.stats.handle_timer_tick_count.inc();
|
||||
let tsnow = SystemTime::now();
|
||||
if SIGINT.load(atomic::Ordering::Acquire) == 1 {
|
||||
warn!("Received SIGINT");
|
||||
SIGINT.store(2, atomic::Ordering::Release);
|
||||
{
|
||||
let n = SIGINT.load(atomic::Ordering::Acquire);
|
||||
let m = SIGINT_CONFIRM.load(atomic::Ordering::Acquire);
|
||||
if m != n {
|
||||
warn!("Received SIGINT");
|
||||
SIGINT_CONFIRM.store(n, atomic::Ordering::Release);
|
||||
}
|
||||
}
|
||||
if SIGTERM.load(atomic::Ordering::Acquire) == 1 {
|
||||
warn!("Received SIGTERM");
|
||||
@@ -306,16 +312,20 @@ impl Daemon {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_channel_add(&mut self, ch: Channel, restx: netfetch::ca::conn::CmdResTx) -> Result<(), Error> {
|
||||
async fn handle_channel_add(
|
||||
&mut self,
|
||||
ch_cfg: ChannelConfig,
|
||||
restx: netfetch::ca::conn::CmdResTx,
|
||||
) -> Result<(), Error> {
|
||||
// debug!("handle_channel_add {ch:?}");
|
||||
self.connset_ctrl
|
||||
.add_channel(self.ingest_opts.backend().into(), ch.id().into(), restx)
|
||||
.add_channel(self.ingest_opts.backend().into(), ch_cfg, restx)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_channel_remove(&mut self, ch: Channel) -> Result<(), Error> {
|
||||
self.connset_ctrl.remove_channel(ch.id().into()).await?;
|
||||
self.connset_ctrl.remove_channel(ch.name().into()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -445,7 +455,7 @@ impl Daemon {
|
||||
};
|
||||
let dt = ts1.elapsed();
|
||||
if dt > Duration::from_millis(200) {
|
||||
warn!("handle_event slow {}ms {}", dt.as_secs_f32() * 1e3, item_summary);
|
||||
warn!("handle_event slow {} ms {}", dt.as_secs_f32() * 1e3, item_summary);
|
||||
}
|
||||
ret
|
||||
}
|
||||
@@ -531,13 +541,16 @@ impl Daemon {
|
||||
}
|
||||
|
||||
static SIGINT: AtomicUsize = AtomicUsize::new(0);
|
||||
static SIGINT_CONFIRM: AtomicUsize = AtomicUsize::new(0);
|
||||
static SIGTERM: AtomicUsize = AtomicUsize::new(0);
|
||||
static SHUTDOWN_SENT: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
fn handler_sigint(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) {
|
||||
std::process::exit(13);
|
||||
SIGINT.store(1, atomic::Ordering::Release);
|
||||
let _ = ingest_linux::signal::unset_signal_handler(libc::SIGINT);
|
||||
let n = SIGINT.fetch_add(1, atomic::Ordering::AcqRel);
|
||||
if n >= 2 {
|
||||
let _ = ingest_linux::signal::unset_signal_handler(libc::SIGINT);
|
||||
std::process::exit(13);
|
||||
}
|
||||
}
|
||||
|
||||
fn handler_sigterm(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) {
|
||||
@@ -545,8 +558,9 @@ fn handler_sigterm(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc:
|
||||
let _ = ingest_linux::signal::unset_signal_handler(libc::SIGTERM);
|
||||
}
|
||||
|
||||
pub async fn run(opts: CaIngestOpts, channels: Option<Vec<String>>) -> Result<(), Error> {
|
||||
pub async fn run(opts: CaIngestOpts, channels_config: Option<ChannelsConfig>) -> Result<(), Error> {
|
||||
info!("start up {opts:?}");
|
||||
debug!("channels_config {channels_config:?}");
|
||||
ingest_linux::signal::set_signal_handler(libc::SIGINT, handler_sigint).map_err(Error::from_string)?;
|
||||
ingest_linux::signal::set_signal_handler(libc::SIGTERM, handler_sigterm).map_err(Error::from_string)?;
|
||||
|
||||
@@ -567,10 +581,11 @@ pub async fn run(opts: CaIngestOpts, channels: Option<Vec<String>>) -> Result<()
|
||||
//let metrics_agg_fut = metrics_agg_task(ingest_commons.clone(), local_stats.clone(), store_stats.clone());
|
||||
//let metrics_agg_jh = tokio::spawn(metrics_agg_fut);
|
||||
|
||||
let mut channels = channels;
|
||||
if opts.test_bsread_addr.is_some() {
|
||||
channels = None;
|
||||
}
|
||||
let channels_config = if opts.test_bsread_addr.is_some() {
|
||||
None
|
||||
} else {
|
||||
channels_config
|
||||
};
|
||||
|
||||
let insert_frac = Arc::new(AtomicU64::new(opts.insert_frac()));
|
||||
let store_workers_rate = Arc::new(AtomicU64::new(opts.store_workers_rate()));
|
||||
@@ -616,13 +631,15 @@ pub async fn run(opts: CaIngestOpts, channels: Option<Vec<String>>) -> Result<()
|
||||
|
||||
let daemon_jh = taskrun::spawn(daemon.daemon());
|
||||
|
||||
if let Some(channels) = channels {
|
||||
debug!("will configure {} channels", channels.len());
|
||||
if let Some(channels_config) = channels_config {
|
||||
debug!("will configure {} channels", channels_config.len());
|
||||
let mut thr_msg = ThrottleTrace::new(Duration::from_millis(1000));
|
||||
let mut i = 0;
|
||||
for s in &channels {
|
||||
let ch = Channel::new(s.into());
|
||||
match tx.send(DaemonEvent::ChannelAdd(ch, async_channel::bounded(1).0)).await {
|
||||
for ch_cfg in channels_config.channels() {
|
||||
match tx
|
||||
.send(DaemonEvent::ChannelAdd(ch_cfg.clone(), async_channel::bounded(1).0))
|
||||
.await
|
||||
{
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
error!("{e}");
|
||||
@@ -632,7 +649,7 @@ pub async fn run(opts: CaIngestOpts, channels: Option<Vec<String>>) -> Result<()
|
||||
thr_msg.trigger("daemon sent ChannelAdd", &[&i as &_]);
|
||||
i += 1;
|
||||
}
|
||||
debug!("{} configured channels applied", channels.len());
|
||||
debug!("{} configured channels applied", channels_config.len());
|
||||
}
|
||||
daemon_jh.await.map_err(|e| Error::with_msg_no_trace(e.to_string()))??;
|
||||
info!("Daemon joined.");
|
||||
|
||||
@@ -3,6 +3,7 @@ use super::proto::CaEventValue;
|
||||
use super::proto::ReadNotify;
|
||||
use super::ExtraInsertsConf;
|
||||
use crate::ca::proto::EventCancel;
|
||||
use crate::conf::ChannelConfig;
|
||||
use crate::senderpolling::SenderPolling;
|
||||
use crate::throttletrace::ThrottleTrace;
|
||||
use async_channel::Receiver;
|
||||
@@ -126,7 +127,7 @@ impl err::ToErr for Error {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub enum ChannelConnectedInfo {
|
||||
Disconnected,
|
||||
Connecting,
|
||||
@@ -134,7 +135,7 @@ pub enum ChannelConnectedInfo {
|
||||
Error,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct ChannelStateInfo {
|
||||
pub cssid: ChannelStatusSeriesId,
|
||||
pub addr: SocketAddrV4,
|
||||
@@ -154,6 +155,7 @@ pub struct ChannelStateInfo {
|
||||
// #[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub item_recv_ivl_ema: Option<f32>,
|
||||
pub interest_score: f32,
|
||||
pub conf: ChannelConfig,
|
||||
}
|
||||
|
||||
mod ser_instant {
|
||||
@@ -360,8 +362,14 @@ enum ChannelState {
|
||||
Ended(ChannelStatusSeriesId),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ChannelConf {
|
||||
conf: ChannelConfig,
|
||||
state: ChannelState,
|
||||
}
|
||||
|
||||
impl ChannelState {
|
||||
fn to_info(&self, cssid: ChannelStatusSeriesId, addr: SocketAddrV4) -> ChannelStateInfo {
|
||||
fn to_info(&self, cssid: ChannelStatusSeriesId, addr: SocketAddrV4, conf: ChannelConfig) -> ChannelStateInfo {
|
||||
let channel_connected_info = match self {
|
||||
ChannelState::Init(..) => ChannelConnectedInfo::Disconnected,
|
||||
ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting,
|
||||
@@ -423,6 +431,7 @@ impl ChannelState {
|
||||
recv_bytes,
|
||||
item_recv_ivl_ema,
|
||||
interest_score,
|
||||
conf,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -557,7 +566,7 @@ pub type CmdResTx = Sender<Result<(), Error>>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ConnCommandKind {
|
||||
ChannelAdd(String, ChannelStatusSeriesId),
|
||||
ChannelAdd(ChannelConfig, ChannelStatusSeriesId),
|
||||
ChannelClose(String),
|
||||
Shutdown,
|
||||
}
|
||||
@@ -569,10 +578,10 @@ pub struct ConnCommand {
|
||||
}
|
||||
|
||||
impl ConnCommand {
|
||||
pub fn channel_add(name: String, cssid: ChannelStatusSeriesId) -> Self {
|
||||
pub fn channel_add(conf: ChannelConfig, cssid: ChannelStatusSeriesId) -> Self {
|
||||
Self {
|
||||
id: Self::make_id(),
|
||||
kind: ConnCommandKind::ChannelAdd(name, cssid),
|
||||
kind: ConnCommandKind::ChannelAdd(conf, cssid),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -698,12 +707,11 @@ pub struct CaConn {
|
||||
proto: Option<CaProto>,
|
||||
cid_store: CidStore,
|
||||
subid_store: SubidStore,
|
||||
channels: HashMap<Cid, ChannelState>,
|
||||
channels: HashMap<Cid, ChannelConf>,
|
||||
// btree because require order:
|
||||
cid_by_name: BTreeMap<String, Cid>,
|
||||
cid_by_subid: HashMap<Subid, Cid>,
|
||||
cid_by_sid: HashMap<Sid, Cid>,
|
||||
name_by_cid: HashMap<Cid, String>,
|
||||
channel_status_emit_last: Instant,
|
||||
tick_last_writer: Instant,
|
||||
init_state_count: u64,
|
||||
@@ -716,7 +724,6 @@ pub struct CaConn {
|
||||
conn_command_rx: Pin<Box<Receiver<ConnCommand>>>,
|
||||
conn_backoff: f32,
|
||||
conn_backoff_beg: f32,
|
||||
extra_inserts_conf: ExtraInsertsConf,
|
||||
ioc_ping_last: Instant,
|
||||
ioc_ping_next: Instant,
|
||||
ioc_ping_start: Option<Instant>,
|
||||
@@ -773,7 +780,6 @@ impl CaConn {
|
||||
cid_by_name: BTreeMap::new(),
|
||||
cid_by_subid: HashMap::new(),
|
||||
cid_by_sid: HashMap::new(),
|
||||
name_by_cid: HashMap::new(),
|
||||
channel_status_emit_last: tsnow,
|
||||
tick_last_writer: tsnow,
|
||||
insert_item_queue: VecDeque::new(),
|
||||
@@ -785,7 +791,6 @@ impl CaConn {
|
||||
conn_command_rx: Box::pin(cq_rx),
|
||||
conn_backoff: 0.02,
|
||||
conn_backoff_beg: 0.02,
|
||||
extra_inserts_conf: ExtraInsertsConf::new(),
|
||||
ioc_ping_last: tsnow,
|
||||
ioc_ping_next: tsnow + Self::ioc_ping_ivl_rng(&mut rng),
|
||||
ioc_ping_start: None,
|
||||
@@ -928,51 +933,7 @@ impl CaConn {
|
||||
// }
|
||||
}
|
||||
|
||||
fn cmd_find_channel(&self, pattern: &str) {
|
||||
let res = if let Ok(re) = regex::Regex::new(&pattern) {
|
||||
self.name_by_cid
|
||||
.values()
|
||||
.filter(|x| re.is_match(x))
|
||||
.map(ToString::to_string)
|
||||
.collect()
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
// TODO return the result
|
||||
}
|
||||
|
||||
fn cmd_channel_state(&self, name: String) {
|
||||
let res = match self.cid_by_name(&name) {
|
||||
Some(cid) => match self.channels.get(&cid) {
|
||||
Some(state) => Some(state.to_info(state.cssid(), self.remote_addr_dbg.clone())),
|
||||
None => None,
|
||||
},
|
||||
None => None,
|
||||
};
|
||||
let msg = (self.remote_addr_dbg.clone(), res);
|
||||
if msg.1.is_some() {
|
||||
info!("Sending back {msg:?}");
|
||||
}
|
||||
// TODO return the result
|
||||
}
|
||||
|
||||
fn cmd_channel_states_all(&self) {
|
||||
let res: Vec<_> = self
|
||||
.channels
|
||||
.iter()
|
||||
.map(|(cid, state)| {
|
||||
// let name = self
|
||||
// .name_by_cid
|
||||
// .get(cid)
|
||||
// .map_or("--unknown--".into(), |x| x.to_string());
|
||||
state.to_info(state.cssid(), self.remote_addr_dbg.clone())
|
||||
})
|
||||
.collect();
|
||||
let msg = (self.remote_addr_dbg.clone(), res);
|
||||
// TODO return the result
|
||||
}
|
||||
|
||||
fn cmd_channel_add(&mut self, name: String, cssid: ChannelStatusSeriesId) {
|
||||
fn cmd_channel_add(&mut self, name: ChannelConfig, cssid: ChannelStatusSeriesId) {
|
||||
self.channel_add(name, cssid);
|
||||
}
|
||||
|
||||
@@ -987,16 +948,6 @@ impl CaConn {
|
||||
self.trigger_shutdown(ChannelStatusClosedReason::ShutdownCommand);
|
||||
}
|
||||
|
||||
fn cmd_extra_inserts_conf(&mut self, extra_inserts_conf: ExtraInsertsConf) {
|
||||
self.extra_inserts_conf = extra_inserts_conf;
|
||||
// TODO return the result
|
||||
}
|
||||
|
||||
fn cmd_save_conn_info(&mut self) {
|
||||
let res = self.emit_channel_info_insert_items();
|
||||
// TODO return the result
|
||||
}
|
||||
|
||||
fn handle_conn_command(&mut self, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
|
||||
use Poll::*;
|
||||
self.stats.loop3_count.inc();
|
||||
@@ -1008,8 +959,8 @@ impl CaConn {
|
||||
Ready(Some(a)) => {
|
||||
trace3!("handle_conn_command received a command {}", self.remote_addr_dbg);
|
||||
match a.kind {
|
||||
ConnCommandKind::ChannelAdd(name, cssid) => {
|
||||
self.cmd_channel_add(name, cssid);
|
||||
ConnCommandKind::ChannelAdd(conf, cssid) => {
|
||||
self.cmd_channel_add(conf, cssid);
|
||||
Ok(Ready(Some(())))
|
||||
}
|
||||
ConnCommandKind::ChannelClose(name) => {
|
||||
@@ -1064,7 +1015,8 @@ impl CaConn {
|
||||
// Store the writer with the channel state.
|
||||
// Create a monitor for the channel.
|
||||
// NOTE: must store the Writer even if not yet in Evented, we could also transition to Polled!
|
||||
if let Some(chst) = self.channels.get_mut(&cid) {
|
||||
if let Some(conf) = self.channels.get_mut(&cid) {
|
||||
let chst = &mut conf.state;
|
||||
if let ChannelState::MakingSeriesWriter(st2) = chst {
|
||||
self.stats.get_series_id_ok.inc();
|
||||
{
|
||||
@@ -1075,7 +1027,7 @@ impl CaConn {
|
||||
});
|
||||
self.insert_item_queue.push_back(item);
|
||||
}
|
||||
let name = self.name_by_cid.get(&st2.channel.cid).map(|x| x.as_str()).unwrap_or("");
|
||||
let name = conf.conf.name();
|
||||
if name.starts_with("TEST:PEAKING:") {
|
||||
let created_state = WritableState {
|
||||
tsbeg: self.poll_tsnow,
|
||||
@@ -1140,16 +1092,20 @@ impl CaConn {
|
||||
self.stats.clone()
|
||||
}
|
||||
|
||||
pub fn channel_add(&mut self, channel: String, cssid: ChannelStatusSeriesId) {
|
||||
if self.cid_by_name(&channel).is_some() {
|
||||
pub fn channel_add(&mut self, conf: ChannelConfig, cssid: ChannelStatusSeriesId) {
|
||||
if self.cid_by_name(conf.name()).is_some() {
|
||||
// TODO count for metrics
|
||||
return;
|
||||
}
|
||||
let cid = self.cid_by_name_or_insert(&channel);
|
||||
let cid = self.cid_by_name_or_insert(conf.name());
|
||||
if self.channels.contains_key(&cid) {
|
||||
error!("logic error channel already exists {channel}");
|
||||
error!("logic error channel already exists {conf:?}");
|
||||
} else {
|
||||
self.channels.insert(cid, ChannelState::Init(cssid));
|
||||
let conf = ChannelConf {
|
||||
conf,
|
||||
state: ChannelState::Init(cssid),
|
||||
};
|
||||
self.channels.insert(cid, conf);
|
||||
// TODO do not count, use separate queue for those channels.
|
||||
self.init_state_count += 1;
|
||||
}
|
||||
@@ -1169,7 +1125,6 @@ impl CaConn {
|
||||
|
||||
fn channel_remove_by_cid(&mut self, cid: Cid) {
|
||||
self.cid_by_name.retain(|_, v| *v != cid);
|
||||
self.name_by_cid.remove(&cid);
|
||||
self.channels.remove(&cid);
|
||||
}
|
||||
|
||||
@@ -1183,13 +1138,12 @@ impl CaConn {
|
||||
} else {
|
||||
let cid = self.cid_store.next();
|
||||
self.cid_by_name.insert(name.into(), cid);
|
||||
self.name_by_cid.insert(cid, name.into());
|
||||
cid
|
||||
}
|
||||
}
|
||||
|
||||
fn name_by_cid(&self, cid: Cid) -> Option<&str> {
|
||||
self.name_by_cid.get(&cid).map(|x| x.as_str())
|
||||
self.channels.get(&cid).map(|x| x.conf.name())
|
||||
}
|
||||
|
||||
fn backoff_next(&mut self) -> u64 {
|
||||
@@ -1205,7 +1159,8 @@ impl CaConn {
|
||||
fn channel_state_on_shutdown(&mut self, channel_reason: ChannelStatusClosedReason) {
|
||||
// TODO can I reuse emit_channel_info_insert_items ?
|
||||
trace!("channel_state_on_shutdown channels {}", self.channels.len());
|
||||
for (_cid, chst) in &mut self.channels {
|
||||
for (_cid, conf) in &mut self.channels {
|
||||
let chst = &mut conf.state;
|
||||
match chst {
|
||||
ChannelState::Init(cssid) => {
|
||||
*chst = ChannelState::Ended(cssid.clone());
|
||||
@@ -1269,7 +1224,8 @@ impl CaConn {
|
||||
}
|
||||
let mut alive_count = 0;
|
||||
let mut not_alive_count = 0;
|
||||
for (_, st) in &self.channels {
|
||||
for (_, conf) in &self.channels {
|
||||
let st = &conf.state;
|
||||
match st {
|
||||
ChannelState::Writable(st2) => {
|
||||
if tsnow.duration_since(st2.channel.ts_alive_last) >= Duration::from_millis(10000) {
|
||||
@@ -1290,7 +1246,8 @@ impl CaConn {
|
||||
|
||||
fn emit_channel_info_insert_items(&mut self) -> Result<(), Error> {
|
||||
let timenow = self.tmp_ts_poll;
|
||||
for (_, st) in &mut self.channels {
|
||||
for (_, conf) in &mut self.channels {
|
||||
let st = &mut conf.state;
|
||||
match st {
|
||||
ChannelState::Init(..) => {
|
||||
// TODO need last-save-ts for this state.
|
||||
@@ -1334,7 +1291,7 @@ impl CaConn {
|
||||
return Ok(());
|
||||
};
|
||||
let ch_s = if let Some(x) = self.channels.get_mut(&cid) {
|
||||
x
|
||||
&mut x.state
|
||||
} else {
|
||||
// TODO return better as error and let caller decide (with more structured errors)
|
||||
// TODO
|
||||
@@ -1377,7 +1334,7 @@ impl CaConn {
|
||||
return Ok(());
|
||||
};
|
||||
let ch_s = if let Some(x) = self.channels.get_mut(&cid) {
|
||||
x
|
||||
&mut x.state
|
||||
} else {
|
||||
// TODO return better as error and let caller decide (with more structured errors)
|
||||
warn!("TODO handle_event_add_res can not find channel for {cid:?} {subid:?}");
|
||||
@@ -1481,7 +1438,7 @@ impl CaConn {
|
||||
return Ok(());
|
||||
};
|
||||
let ch_s = if let Some(x) = self.channels.get_mut(&cid) {
|
||||
x
|
||||
&mut x.state
|
||||
} else {
|
||||
// TODO return better as error and let caller decide (with more structured errors)
|
||||
warn!("TODO handle_event_add_res can not find channel for {cid:?} {subid:?}");
|
||||
@@ -1532,7 +1489,7 @@ impl CaConn {
|
||||
let ioid = Ioid(ev.ioid);
|
||||
if let Some(cid) = self.read_ioids.get(&ioid) {
|
||||
let ch_s = if let Some(x) = self.channels.get_mut(cid) {
|
||||
x
|
||||
&mut x.state
|
||||
} else {
|
||||
warn!("handle_read_notify_res can not find channel for {cid:?} {ioid:?}");
|
||||
return Ok(());
|
||||
@@ -1621,7 +1578,7 @@ impl CaConn {
|
||||
let series = writer.sid();
|
||||
// TODO should attach these counters already to Writable state.
|
||||
let ts_local = {
|
||||
let epoch = stnow.duration_since(std::time::UNIX_EPOCH).unwrap();
|
||||
let epoch = stnow.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::ZERO);
|
||||
epoch.as_secs() * SEC + epoch.subsec_nanos() as u64
|
||||
};
|
||||
let ts = value.ts;
|
||||
@@ -1722,7 +1679,8 @@ impl CaConn {
|
||||
*/
|
||||
fn handle_handshake(&mut self, cx: &mut Context) -> Poll<Option<Result<(), Error>>> {
|
||||
use Poll::*;
|
||||
match self.proto.as_mut().unwrap().poll_next_unpin(cx) {
|
||||
let proto = self.proto.as_mut().ok_or_else(|| Error::NoProtocol)?;
|
||||
match proto.poll_next_unpin(cx) {
|
||||
Ready(Some(k)) => match k {
|
||||
Ok(k) => match k {
|
||||
CaItem::Empty => {
|
||||
@@ -1770,16 +1728,16 @@ impl CaConn {
|
||||
if self.init_state_count == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
let keys: Vec<Cid> = self.channels.keys().map(|x| *x).collect();
|
||||
let channels = &mut self.channels;
|
||||
let proto = self.proto.as_mut().ok_or_else(|| Error::NoProtocol)?;
|
||||
let keys: Vec<Cid> = channels.keys().map(|x| *x).collect();
|
||||
for cid in keys {
|
||||
match self.channels.get(&cid).unwrap() {
|
||||
let conf = channels.get(&cid).ok_or_else(|| Error::UnknownCid(cid))?;
|
||||
let st = &conf.state;
|
||||
match st {
|
||||
ChannelState::Init(cssid) => {
|
||||
let cssid = cssid.clone();
|
||||
let name = self.name_by_cid(cid).ok_or_else(|| Error::UnknownCid(cid));
|
||||
let name = match name {
|
||||
Ok(k) => k.to_string(),
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
let name = conf.conf.name();
|
||||
let msg = CaMsg::from_ty_ts(
|
||||
CaMsgTy::CreateChan(CreateChan {
|
||||
cid: cid.0,
|
||||
@@ -1788,10 +1746,10 @@ impl CaConn {
|
||||
tsnow,
|
||||
);
|
||||
do_wake_again = true;
|
||||
self.proto.as_mut().unwrap().push_out(msg);
|
||||
// TODO handle not-found error:
|
||||
let ch_s = self.channels.get_mut(&cid).unwrap();
|
||||
*ch_s = ChannelState::Creating(CreatingState {
|
||||
proto.push_out(msg);
|
||||
// TODO handle not-found error, just count and continue?
|
||||
let ch_s = channels.get_mut(&cid).ok_or_else(|| Error::UnknownCid(cid))?;
|
||||
ch_s.state = ChannelState::Creating(CreatingState {
|
||||
tsbeg: tsnow,
|
||||
cssid,
|
||||
cid,
|
||||
@@ -1810,8 +1768,9 @@ impl CaConn {
|
||||
fn check_channels_state_poll(&mut self, tsnow: Instant, cx: &mut Context) -> Result<(), Error> {
|
||||
let mut do_wake_again = false;
|
||||
let channels = &mut self.channels;
|
||||
for (_k, v) in channels {
|
||||
match v {
|
||||
for (_k, conf) in channels {
|
||||
let chst = &mut conf.state;
|
||||
match chst {
|
||||
ChannelState::Init(_) => {}
|
||||
ChannelState::Creating(_) => {}
|
||||
ChannelState::MakingSeriesWriter(_) => {}
|
||||
@@ -1924,7 +1883,8 @@ impl CaConn {
|
||||
// The channel status must be "Fail" so that ConnSet can decide to re-search.
|
||||
// TODO how to transition the channel state? Any invariants or simply write to the map?
|
||||
let cid = Cid(msg.cid);
|
||||
if let Some(name) = self.name_by_cid.get(&cid) {
|
||||
if let Some(conf) = self.channels.get(&cid) {
|
||||
let name = conf.conf.name();
|
||||
debug!("queue event to notive channel create fail {name}");
|
||||
let item = CaConnEvent {
|
||||
ts: tsnow,
|
||||
@@ -1987,18 +1947,15 @@ impl CaConn {
|
||||
fn handle_create_chan_res(&mut self, k: proto::CreateChanRes, tsnow: Instant) -> Result<(), Error> {
|
||||
let cid = Cid(k.cid);
|
||||
let sid = Sid(k.sid);
|
||||
let channels = &mut self.channels;
|
||||
let name_by_cid = &self.name_by_cid;
|
||||
// TODO handle cid-not-found which can also indicate peer error.
|
||||
let name = if let Some(x) = name_by_cid.get(&cid) {
|
||||
x.to_string()
|
||||
let conf = if let Some(x) = self.channels.get_mut(&cid) {
|
||||
x
|
||||
} else {
|
||||
return Err(Error::NoNameForCid(cid));
|
||||
// TODO handle not-found error: just count for metrics?
|
||||
warn!("CreateChanRes {:?} unknown", cid);
|
||||
return Ok(());
|
||||
};
|
||||
trace!("handle_create_chan_res {k:?} {name:?}");
|
||||
// TODO handle not-found error:
|
||||
let ch_s = channels.get_mut(&cid).unwrap();
|
||||
let cssid = match ch_s {
|
||||
let chst = &mut conf.state;
|
||||
let cssid = match chst {
|
||||
ChannelState::Creating(st) => st.cssid.clone(),
|
||||
_ => {
|
||||
// TODO handle in better way:
|
||||
@@ -2043,11 +2000,11 @@ impl CaConn {
|
||||
account_count: 0,
|
||||
account_bytes: 0,
|
||||
};
|
||||
*ch_s = ChannelState::MakingSeriesWriter(MakingSeriesWriterState { tsbeg: tsnow, channel });
|
||||
*chst = ChannelState::MakingSeriesWriter(MakingSeriesWriterState { tsbeg: tsnow, channel });
|
||||
let job = EstablishWorkerJob::new(
|
||||
JobId(cid.0 as _),
|
||||
self.backend.clone(),
|
||||
name.into(),
|
||||
conf.conf.name().into(),
|
||||
scalar_type,
|
||||
shape,
|
||||
self.writer_tx.clone(),
|
||||
@@ -2260,10 +2217,10 @@ impl CaConn {
|
||||
|
||||
fn emit_channel_status(&mut self) -> Result<(), Error> {
|
||||
let mut channel_statuses = BTreeMap::new();
|
||||
for e in self.channels.iter() {
|
||||
let ch = &e.1;
|
||||
let chinfo = ch.to_info(ch.cssid(), self.remote_addr_dbg);
|
||||
channel_statuses.insert(ch.cssid(), chinfo);
|
||||
for (_, conf) in self.channels.iter() {
|
||||
let chst = &conf.state;
|
||||
let chinfo = chst.to_info(chst.cssid(), self.remote_addr_dbg, conf.conf.clone());
|
||||
channel_statuses.insert(chst.cssid(), chinfo);
|
||||
}
|
||||
trace!("emit_channel_status {}", channel_statuses.len());
|
||||
let val = ChannelStatusPartial { channel_statuses };
|
||||
@@ -2287,7 +2244,8 @@ impl CaConn {
|
||||
let stnow = self.tmp_ts_poll;
|
||||
let ts_sec = stnow.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
|
||||
let ts_sec_snap = ts_sec / EMIT_ACCOUNTING_SNAP * EMIT_ACCOUNTING_SNAP;
|
||||
for (_k, st0) in self.channels.iter_mut() {
|
||||
for (_k, chconf) in self.channels.iter_mut() {
|
||||
let st0 = &mut chconf.state;
|
||||
match st0 {
|
||||
ChannelState::Writable(st1) => {
|
||||
let ch = &mut st1.channel;
|
||||
@@ -2317,8 +2275,9 @@ impl CaConn {
|
||||
}
|
||||
|
||||
fn tick_writers(&mut self) -> Result<(), Error> {
|
||||
for (k, st) in &mut self.channels {
|
||||
if let ChannelState::Writable(st2) = st {
|
||||
for (_, chconf) in &mut self.channels {
|
||||
let chst = &mut chconf.state;
|
||||
if let ChannelState::Writable(st2) = chst {
|
||||
st2.writer.tick(&mut self.insert_item_queue)?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ use crate::ca::statemap::CaConnState;
|
||||
use crate::ca::statemap::MaybeWrongAddressState;
|
||||
use crate::ca::statemap::WithAddressState;
|
||||
use crate::conf::CaIngestOpts;
|
||||
use crate::conf::ChannelConfig;
|
||||
use crate::daemon_common::Channel;
|
||||
use crate::errconv::ErrConv;
|
||||
use crate::rt::JoinHandle;
|
||||
@@ -61,7 +62,7 @@ use std::collections::VecDeque;
|
||||
use std::net::SocketAddr;
|
||||
use std::net::SocketAddrV4;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
@@ -138,7 +139,7 @@ impl CaConnRes {
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ChannelAddWithAddr {
|
||||
backend: String,
|
||||
name: String,
|
||||
ch_cfg: ChannelConfig,
|
||||
cssid: ChannelStatusSeriesId,
|
||||
addr: SocketAddr,
|
||||
}
|
||||
@@ -146,17 +147,23 @@ pub struct ChannelAddWithAddr {
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ChannelAddWithStatusId {
|
||||
backend: String,
|
||||
name: String,
|
||||
ch_cfg: ChannelConfig,
|
||||
cssid: ChannelStatusSeriesId,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ChannelAdd {
|
||||
backend: String,
|
||||
name: String,
|
||||
ch_cfg: ChannelConfig,
|
||||
restx: crate::ca::conn::CmdResTx,
|
||||
}
|
||||
|
||||
impl ChannelAdd {
|
||||
pub fn name(&self) -> &str {
|
||||
&self.ch_cfg.name()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ChannelRemove {
|
||||
name: String,
|
||||
@@ -246,10 +253,10 @@ impl CaConnSetCtrl {
|
||||
pub async fn add_channel(
|
||||
&self,
|
||||
backend: String,
|
||||
name: String,
|
||||
ch_cfg: ChannelConfig,
|
||||
restx: crate::ca::conn::CmdResTx,
|
||||
) -> Result<(), Error> {
|
||||
let cmd = ChannelAdd { backend, name, restx };
|
||||
let cmd = ChannelAdd { backend, ch_cfg, restx };
|
||||
let cmd = ConnSetCmd::ChannelAdd(cmd);
|
||||
self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?;
|
||||
Ok(())
|
||||
@@ -519,13 +526,10 @@ impl CaConnSet {
|
||||
trace3!("handle_add_channel but shutdown_stopping");
|
||||
return Ok(());
|
||||
}
|
||||
trace3!("handle_add_channel {}", cmd.name);
|
||||
if trigger.contains(&cmd.name.as_str()) {
|
||||
debug!("handle_add_channel {cmd:?}");
|
||||
}
|
||||
trace3!("handle_add_channel {:?}", cmd);
|
||||
self.stats.channel_add().inc();
|
||||
// TODO should I add the transition through ActiveChannelState::Init as well?
|
||||
let ch = Channel::new(cmd.name.clone());
|
||||
let ch = Channel::new(cmd.name().into());
|
||||
let _st = if let Some(e) = self.channel_states.get_mut(&ch) {
|
||||
e
|
||||
} else {
|
||||
@@ -533,14 +537,16 @@ impl CaConnSet {
|
||||
value: ChannelStateValue::Active(ActiveChannelState::WaitForStatusSeriesId {
|
||||
since: SystemTime::now(),
|
||||
}),
|
||||
config: cmd.ch_cfg.clone(),
|
||||
};
|
||||
self.channel_states.insert(ch.clone(), item);
|
||||
self.channel_states.get_mut(&ch).unwrap()
|
||||
};
|
||||
let channel_name = cmd.name().into();
|
||||
let tx = self.channel_info_res_tx.as_ref().get_ref().clone();
|
||||
let item = ChannelInfoQuery {
|
||||
backend: cmd.backend,
|
||||
channel: cmd.name,
|
||||
channel: channel_name,
|
||||
kind: SeriesKind::ChannelStatus,
|
||||
scalar_type: ScalarType::ChannelStatus,
|
||||
shape: Shape::Scalar,
|
||||
@@ -571,34 +577,44 @@ impl CaConnSet {
|
||||
}
|
||||
match res {
|
||||
Ok(res) => {
|
||||
let cssid = ChannelStatusSeriesId::new(res.series.to_series().id());
|
||||
self.channel_by_cssid
|
||||
.insert(cssid.clone(), Channel::new(res.channel.clone()));
|
||||
let add = ChannelAddWithStatusId {
|
||||
backend: res.backend,
|
||||
name: res.channel,
|
||||
cssid,
|
||||
};
|
||||
self.handle_add_channel_with_status_id(add)?;
|
||||
let channel = Channel::new(res.channel.clone());
|
||||
// TODO must not depend on purely informative `self.channel_state`
|
||||
if let Some(st) = self.channel_states.get_mut(&channel) {
|
||||
let cssid = ChannelStatusSeriesId::new(res.series.to_series().id());
|
||||
self.channel_by_cssid
|
||||
.insert(cssid.clone(), Channel::new(res.channel.clone()));
|
||||
let add = ChannelAddWithStatusId {
|
||||
backend: res.backend,
|
||||
ch_cfg: st.config.clone(),
|
||||
cssid,
|
||||
};
|
||||
self.handle_add_channel_with_status_id(add)?;
|
||||
Ok(())
|
||||
} else {
|
||||
// TODO count for metrics
|
||||
warn!("received series id for unknown channel");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("TODO handle error {e}");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_add_channel_with_status_id(&mut self, cmd: ChannelAddWithStatusId) -> Result<(), Error> {
|
||||
trace3!("handle_add_channel_with_status_id {}", cmd.name);
|
||||
let name = cmd.ch_cfg.name();
|
||||
trace3!("handle_add_channel_with_status_id {}", name);
|
||||
if self.shutdown_stopping {
|
||||
debug!("handle_add_channel but shutdown_stopping");
|
||||
return Ok(());
|
||||
}
|
||||
self.stats.channel_status_series_found().inc();
|
||||
if trigger.contains(&cmd.name.as_str()) {
|
||||
if trigger.contains(&name) {
|
||||
debug!("handle_add_channel_with_status_id {cmd:?}");
|
||||
}
|
||||
let ch = Channel::new(cmd.name.clone());
|
||||
let ch = Channel::new(name.into());
|
||||
if let Some(chst) = self.channel_states.get_mut(&ch) {
|
||||
if let ChannelStateValue::Active(chst2) = &mut chst.value {
|
||||
if let ActiveChannelState::WaitForStatusSeriesId { since } = chst2 {
|
||||
@@ -614,7 +630,7 @@ impl CaConnSet {
|
||||
since: SystemTime::now(),
|
||||
},
|
||||
});
|
||||
let qu = IocAddrQuery::cached(cmd.name);
|
||||
let qu = IocAddrQuery::cached(name.into());
|
||||
self.find_ioc_query_queue.push_back(qu);
|
||||
self.stats.ioc_search_start().inc();
|
||||
} else {
|
||||
@@ -633,6 +649,7 @@ impl CaConnSet {
|
||||
}
|
||||
|
||||
fn handle_add_channel_with_addr(&mut self, cmd: ChannelAddWithAddr) -> Result<(), Error> {
|
||||
let name = cmd.ch_cfg.name();
|
||||
if self.shutdown_stopping {
|
||||
trace3!("handle_add_channel but shutdown_stopping");
|
||||
return Ok(());
|
||||
@@ -642,10 +659,10 @@ impl CaConnSet {
|
||||
} else {
|
||||
return Err(Error::with_msg_no_trace("ipv4 for epics"));
|
||||
};
|
||||
if trigger.contains(&cmd.name.as_str()) {
|
||||
if trigger.contains(&name) {
|
||||
debug!("handle_add_channel_with_addr {cmd:?}");
|
||||
}
|
||||
let ch = Channel::new(cmd.name.clone());
|
||||
let ch = Channel::new(name.into());
|
||||
if let Some(chst) = self.channel_states.get_mut(&ch) {
|
||||
if let ChannelStateValue::Active(ast) = &mut chst.value {
|
||||
if let ActiveChannelState::WithStatusSeriesId(st3) = ast {
|
||||
@@ -673,7 +690,7 @@ impl CaConnSet {
|
||||
self.ca_conn_ress.insert(addr, c);
|
||||
}
|
||||
let conn_ress = self.ca_conn_ress.get_mut(&addr).unwrap();
|
||||
let cmd = ConnCommand::channel_add(cmd.name, cmd.cssid);
|
||||
let cmd = ConnCommand::channel_add(cmd.ch_cfg, cmd.cssid);
|
||||
conn_ress.cmd_queue.push_back(cmd);
|
||||
}
|
||||
}
|
||||
@@ -728,7 +745,7 @@ impl CaConnSet {
|
||||
}
|
||||
for res in results {
|
||||
let ch = Channel::new(res.channel.clone());
|
||||
if trigger.contains(&ch.id()) {
|
||||
if trigger.contains(&ch.name()) {
|
||||
trace!("handle_ioc_query_result {res:?}");
|
||||
}
|
||||
if let Some(chst) = self.channel_states.get_mut(&ch) {
|
||||
@@ -739,7 +756,7 @@ impl CaConnSet {
|
||||
trace!("ioc found {res:?}");
|
||||
let cmd = ChannelAddWithAddr {
|
||||
backend: self.backend.clone(),
|
||||
name: res.channel,
|
||||
ch_cfg: chst.config.clone(),
|
||||
addr: SocketAddr::V4(addr),
|
||||
cssid: st2.cssid.clone(),
|
||||
};
|
||||
@@ -793,8 +810,8 @@ impl CaConnSet {
|
||||
let channels_ca_conn_set = self
|
||||
.channel_states
|
||||
.iter()
|
||||
.filter(|(k, _)| reg1.is_match(k.id()))
|
||||
.map(|(k, v)| (k.id().to_string(), v.clone()))
|
||||
.filter(|(k, _)| reg1.is_match(k.name()))
|
||||
.map(|(k, v)| (k.name().to_string(), v.clone()))
|
||||
.collect();
|
||||
let item = ChannelStatusesResponse { channels_ca_conn_set };
|
||||
if req.tx.try_send(item).is_err() {
|
||||
@@ -942,9 +959,9 @@ impl CaConnSet {
|
||||
} = &mut st3.inner
|
||||
{
|
||||
if SocketAddr::V4(*addr_ch) == addr {
|
||||
if trigger.contains(&ch.id()) {
|
||||
if trigger.contains(&ch.name()) {
|
||||
self.connect_fail_count += 1;
|
||||
debug!(" connect fail, maybe wrong address for {} {}", addr, ch.id());
|
||||
debug!(" connect fail, maybe wrong address for {} {}", addr, ch.name());
|
||||
}
|
||||
if self.connect_fail_count > 400 {
|
||||
std::process::exit(1);
|
||||
@@ -1290,7 +1307,7 @@ impl CaConnSet {
|
||||
} else {
|
||||
search_pending_count += 1;
|
||||
st3.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: stnow };
|
||||
let qu = IocAddrQuery::uncached(ch.id().into());
|
||||
let qu = IocAddrQuery::uncached(ch.name().into());
|
||||
self.find_ioc_query_queue.push_back(qu);
|
||||
self.stats.ioc_search_start().inc();
|
||||
}
|
||||
@@ -1317,7 +1334,7 @@ impl CaConnSet {
|
||||
assigned_without_health_update += 1;
|
||||
let cmd = ChannelAddWithAddr {
|
||||
backend: self.backend.clone(),
|
||||
name: ch.id().into(),
|
||||
ch_cfg: st.config.clone(),
|
||||
cssid: st3.cssid.clone(),
|
||||
addr: SocketAddr::V4(*addr_v4),
|
||||
};
|
||||
@@ -1358,12 +1375,12 @@ impl CaConnSet {
|
||||
if st4.since + st4.backoff_dt < stnow {
|
||||
if search_pending_count < CURRENT_SEARCH_PENDING_MAX as _ {
|
||||
trace!("try again channel after MaybeWrongAddress");
|
||||
if trigger.contains(&ch.id()) {
|
||||
debug!("issue ioc search for {}", ch.id());
|
||||
if trigger.contains(&ch.name()) {
|
||||
debug!("issue ioc search for {}", ch.name());
|
||||
}
|
||||
search_pending_count += 1;
|
||||
st3.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: stnow };
|
||||
let qu = IocAddrQuery::uncached(ch.id().into());
|
||||
let qu = IocAddrQuery::uncached(ch.name().into());
|
||||
self.find_ioc_query_queue.push_back(qu);
|
||||
self.stats.ioc_search_start().inc();
|
||||
}
|
||||
@@ -1385,10 +1402,10 @@ impl CaConnSet {
|
||||
}
|
||||
for (addr, ch) in cmd_remove_channel {
|
||||
if let Some(g) = self.ca_conn_ress.get_mut(&addr) {
|
||||
let cmd = ConnCommand::channel_close(ch.id().into());
|
||||
let cmd = ConnCommand::channel_close(ch.name().into());
|
||||
g.cmd_queue.push_back(cmd);
|
||||
}
|
||||
let cmd = ChannelRemove { name: ch.id().into() };
|
||||
let cmd = ChannelRemove { name: ch.name().into() };
|
||||
self.handle_remove_channel(cmd)?;
|
||||
}
|
||||
for cmd in cmd_add_channel {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::ca::conn::ChannelStateInfo;
|
||||
use crate::conf::ChannelConfig;
|
||||
use crate::daemon_common::Channel;
|
||||
use dashmap::DashMap;
|
||||
use serde::Serialize;
|
||||
@@ -133,6 +134,7 @@ pub enum ChannelStateValue {
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct ChannelState {
|
||||
pub value: ChannelStateValue,
|
||||
pub config: ChannelConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
|
||||
@@ -188,7 +188,7 @@ pub async fn parse_config(config: PathBuf) -> Result<(CaIngestOpts, Option<Chann
|
||||
let mut file = OpenOptions::new().read(true).open(config).await?;
|
||||
let mut buf = Vec::new();
|
||||
file.read_to_end(&mut buf).await?;
|
||||
let conf: CaIngestOpts = serde_yaml::from_slice(&buf).map_err(|e| Error::with_msg_no_trace(format!("{:?}", e)))?;
|
||||
let conf: CaIngestOpts = serde_yaml::from_slice(&buf).map_err(Error::from_string)?;
|
||||
drop(file);
|
||||
let re_p = regex::Regex::new(&conf.whitelist.clone().unwrap_or("--nothing-ur9nc23ur98c--".into()))?;
|
||||
let re_n = regex::Regex::new(&conf.blacklist.clone().unwrap_or("--nothing-ksm2u98rcm28--".into()))?;
|
||||
@@ -223,10 +223,16 @@ async fn parse_config_dir(dir: &Path) -> Result<ChannelsConfig, Error> {
|
||||
} else {
|
||||
break;
|
||||
};
|
||||
|
||||
// TODO parse the yml file at this path and compile a merged configuration
|
||||
e.path();
|
||||
todo!();
|
||||
let fnp = e.path();
|
||||
let fns = fnp.to_str().unwrap();
|
||||
if fns.ends_with(".yml") || fns.ends_with(".yaml") {
|
||||
let buf = tokio::fs::read(e.path()).await?;
|
||||
let conf: BTreeMap<String, ChannelConfigParse> =
|
||||
serde_yaml::from_slice(&buf).map_err(Error::from_string)?;
|
||||
ret.push_from_parsed(&conf);
|
||||
} else {
|
||||
debug!("ignore channel config file {:?}", e.path());
|
||||
}
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
@@ -271,23 +277,39 @@ pub struct ChannelConfigParse {
|
||||
archiving_configuration: IngestConfigArchiving,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct IngestConfigArchiving {
|
||||
#[serde(default, with = "serde_replication_bool")]
|
||||
#[serde(default, skip_serializing_if = "bool_is_false")]
|
||||
#[serde(with = "serde_replication_bool")]
|
||||
replication: bool,
|
||||
#[serde(default, with = "serde_option_channel_read_config")]
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
#[serde(with = "serde_option_channel_read_config")]
|
||||
short_term: Option<ChannelReadConfig>,
|
||||
#[serde(default, with = "serde_option_channel_read_config")]
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
#[serde(with = "serde_option_channel_read_config")]
|
||||
medium_term: Option<ChannelReadConfig>,
|
||||
#[serde(default, with = "serde_option_channel_read_config")]
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
#[serde(with = "serde_option_channel_read_config")]
|
||||
long_term: Option<ChannelReadConfig>,
|
||||
}
|
||||
|
||||
fn bool_is_false(x: &bool) -> bool {
|
||||
*x == false
|
||||
}
|
||||
|
||||
mod serde_replication_bool {
|
||||
use serde::de;
|
||||
use serde::Deserializer;
|
||||
use serde::Serializer;
|
||||
use std::fmt;
|
||||
|
||||
pub fn serialize<S>(v: &bool, ser: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
ser.serialize_bool(*v)
|
||||
}
|
||||
|
||||
pub fn deserialize<'de, D>(de: D) -> Result<bool, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
@@ -333,9 +355,23 @@ mod serde_option_channel_read_config {
|
||||
use super::ChannelReadConfig;
|
||||
use serde::de;
|
||||
use serde::Deserializer;
|
||||
use serde::Serializer;
|
||||
use std::fmt;
|
||||
use std::time::Duration;
|
||||
|
||||
pub fn serialize<S>(v: &Option<ChannelReadConfig>, ser: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
match v {
|
||||
Some(x) => match x {
|
||||
ChannelReadConfig::Monitor => ser.serialize_str("Monitor"),
|
||||
ChannelReadConfig::Poll(n) => ser.serialize_u32(n.as_secs() as u32),
|
||||
},
|
||||
None => ser.serialize_none(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deserialize<'de, D>(de: D) -> Result<Option<ChannelReadConfig>, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
@@ -391,7 +427,7 @@ mod serde_option_channel_read_config {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum ChannelReadConfig {
|
||||
Monitor,
|
||||
Poll(Duration),
|
||||
@@ -421,6 +457,7 @@ CH-03:
|
||||
let x: BTreeMap<String, ChannelConfigParse> = serde_yaml::from_str(inp).unwrap();
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ChannelsConfig {
|
||||
channels: Vec<ChannelConfig>,
|
||||
}
|
||||
@@ -429,6 +466,24 @@ impl ChannelsConfig {
|
||||
fn new() -> Self {
|
||||
Self { channels: Vec::new() }
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.channels.len()
|
||||
}
|
||||
|
||||
pub fn channels(&self) -> &Vec<ChannelConfig> {
|
||||
&self.channels
|
||||
}
|
||||
|
||||
fn push_from_parsed(&mut self, rhs: &BTreeMap<String, ChannelConfigParse>) {
|
||||
for (k, v) in rhs.iter() {
|
||||
let item = ChannelConfig {
|
||||
name: k.into(),
|
||||
arch: v.archiving_configuration.clone(),
|
||||
};
|
||||
self.channels.push(item);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<BTreeMap<String, ChannelConfigParse>> for ChannelsConfig {
|
||||
@@ -444,7 +499,26 @@ impl From<BTreeMap<String, ChannelConfigParse>> for ChannelsConfig {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct ChannelConfig {
|
||||
name: String,
|
||||
arch: IngestConfigArchiving,
|
||||
}
|
||||
|
||||
impl ChannelConfig {
|
||||
pub fn st_monitor<S: Into<String>>(name: S) -> Self {
|
||||
Self {
|
||||
name: name.into(),
|
||||
arch: IngestConfigArchiving {
|
||||
replication: true,
|
||||
short_term: Some(ChannelReadConfig::Monitor),
|
||||
medium_term: None,
|
||||
long_term: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,26 +1,27 @@
|
||||
use crate::ca::connset::CaConnSetItem;
|
||||
use crate::conf::ChannelConfig;
|
||||
use async_channel::Sender;
|
||||
use serde::Serialize;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, PartialEq, PartialOrd, Eq, Ord, Hash)]
|
||||
pub struct Channel {
|
||||
id: String,
|
||||
name: String,
|
||||
}
|
||||
|
||||
impl Channel {
|
||||
pub fn new(id: String) -> Self {
|
||||
Self { id }
|
||||
pub fn new(name: String) -> Self {
|
||||
Self { name }
|
||||
}
|
||||
|
||||
pub fn id(&self) -> &str {
|
||||
&self.id
|
||||
pub fn name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum DaemonEvent {
|
||||
TimerTick(u32, Sender<u32>),
|
||||
ChannelAdd(Channel, crate::ca::conn::CmdResTx),
|
||||
ChannelAdd(ChannelConfig, crate::ca::conn::CmdResTx),
|
||||
ChannelRemove(Channel),
|
||||
CaConnSetItem(CaConnSetItem),
|
||||
Shutdown,
|
||||
|
||||
@@ -6,6 +6,7 @@ use crate::ca::connset::ChannelStatusesRequest;
|
||||
use crate::ca::connset::ChannelStatusesResponse;
|
||||
use crate::ca::connset::ConnSetCmd;
|
||||
use crate::ca::statemap::ChannelState;
|
||||
use crate::conf::ChannelConfig;
|
||||
use crate::daemon_common::DaemonEvent;
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
@@ -140,9 +141,10 @@ async fn find_channel(
|
||||
|
||||
async fn channel_add_inner(params: HashMap<String, String>, dcom: Arc<DaemonComm>) -> Result<(), Error> {
|
||||
if let Some(name) = params.get("name") {
|
||||
let ch = crate::daemon_common::Channel::new(name.into());
|
||||
// let ch = crate::daemon_common::Channel::new(name.into());
|
||||
let ch_cfg = ChannelConfig::st_monitor(name);
|
||||
let (tx, rx) = async_channel::bounded(1);
|
||||
let ev = DaemonEvent::ChannelAdd(ch, tx);
|
||||
let ev = DaemonEvent::ChannelAdd(ch_cfg, tx);
|
||||
dcom.tx.send(ev).await?;
|
||||
match rx.recv().await {
|
||||
Ok(Ok(())) => Ok(()),
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::ca::connset::CaConnSetEvent;
|
||||
use crate::ca::connset::ChannelStatusesRequest;
|
||||
use crate::ca::connset::ConnSetCmd;
|
||||
use crate::conf::ChannelConfig;
|
||||
use async_channel::Sender;
|
||||
use serde::Serialize;
|
||||
use std::collections::BTreeMap;
|
||||
@@ -16,7 +17,7 @@ pub struct ChannelStates {
|
||||
struct ChannelState {
|
||||
ioc_address: Option<SocketAddr>,
|
||||
connection: ConnectionState,
|
||||
archive_settings: ArchiveSettings,
|
||||
archiving_configuration: ChannelConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
@@ -28,23 +29,6 @@ enum ConnectionState {
|
||||
Error,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct ArchiveSettings {
|
||||
short_term: Option<String>,
|
||||
medium_term: Option<String>,
|
||||
long_term: Option<String>,
|
||||
}
|
||||
|
||||
impl ArchiveSettings {
|
||||
fn dummy() -> Self {
|
||||
Self {
|
||||
short_term: None,
|
||||
medium_term: None,
|
||||
long_term: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ChannelStatusesResponse
|
||||
// BTreeMap<String, ChannelState>
|
||||
pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSetEvent>) -> axum::Json<ChannelStates> {
|
||||
@@ -73,7 +57,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
|
||||
let chst = ChannelState {
|
||||
ioc_address: None,
|
||||
connection: ConnectionState::Connecting,
|
||||
archive_settings: ArchiveSettings::dummy(),
|
||||
archiving_configuration: st1.config,
|
||||
};
|
||||
states.channels.insert(k, chst);
|
||||
}
|
||||
@@ -81,7 +65,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
|
||||
let chst = ChannelState {
|
||||
ioc_address: None,
|
||||
connection: ConnectionState::Connecting,
|
||||
archive_settings: ArchiveSettings::dummy(),
|
||||
archiving_configuration: st1.config,
|
||||
};
|
||||
states.channels.insert(k, chst);
|
||||
}
|
||||
@@ -92,7 +76,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
|
||||
let chst = ChannelState {
|
||||
ioc_address: None,
|
||||
connection: ConnectionState::Connecting,
|
||||
archive_settings: ArchiveSettings::dummy(),
|
||||
archiving_configuration: st1.config,
|
||||
};
|
||||
states.channels.insert(k, chst);
|
||||
}
|
||||
@@ -103,7 +87,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
|
||||
let chst = ChannelState {
|
||||
ioc_address: Some(SocketAddr::V4(addr)),
|
||||
connection: ConnectionState::Connecting,
|
||||
archive_settings: ArchiveSettings::dummy(),
|
||||
archiving_configuration: st1.config,
|
||||
};
|
||||
states.channels.insert(k, chst);
|
||||
}
|
||||
@@ -114,7 +98,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
|
||||
let chst = ChannelState {
|
||||
ioc_address: Some(SocketAddr::V4(addr)),
|
||||
connection: ConnectionState::Connecting,
|
||||
archive_settings: ArchiveSettings::dummy(),
|
||||
archiving_configuration: st1.config,
|
||||
};
|
||||
states.channels.insert(k, chst);
|
||||
}
|
||||
@@ -125,7 +109,9 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
|
||||
let chst = ChannelState {
|
||||
ioc_address: Some(SocketAddr::V4(addr)),
|
||||
connection: ConnectionState::Disconnected,
|
||||
archive_settings: ArchiveSettings::dummy(),
|
||||
// TODO config is stored in two places
|
||||
// conf: st6.conf,
|
||||
archiving_configuration: st1.config,
|
||||
};
|
||||
states.channels.insert(k, chst);
|
||||
}
|
||||
@@ -133,7 +119,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
|
||||
let chst = ChannelState {
|
||||
ioc_address: Some(SocketAddr::V4(addr)),
|
||||
connection: ConnectionState::Connecting,
|
||||
archive_settings: ArchiveSettings::dummy(),
|
||||
archiving_configuration: st1.config,
|
||||
};
|
||||
states.channels.insert(k, chst);
|
||||
}
|
||||
@@ -141,7 +127,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
|
||||
let chst = ChannelState {
|
||||
ioc_address: Some(SocketAddr::V4(addr)),
|
||||
connection: ConnectionState::Connected,
|
||||
archive_settings: ArchiveSettings::dummy(),
|
||||
archiving_configuration: st1.config,
|
||||
};
|
||||
states.channels.insert(k, chst);
|
||||
}
|
||||
@@ -149,7 +135,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
|
||||
let chst = ChannelState {
|
||||
ioc_address: Some(SocketAddr::V4(addr)),
|
||||
connection: ConnectionState::Error,
|
||||
archive_settings: ArchiveSettings::dummy(),
|
||||
archiving_configuration: st1.config,
|
||||
};
|
||||
states.channels.insert(k, chst);
|
||||
}
|
||||
@@ -163,7 +149,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
|
||||
let chst = ChannelState {
|
||||
ioc_address: None,
|
||||
connection: ConnectionState::Connecting,
|
||||
archive_settings: ArchiveSettings::dummy(),
|
||||
archiving_configuration: st1.config,
|
||||
};
|
||||
states.channels.insert(k, chst);
|
||||
}
|
||||
@@ -171,7 +157,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
|
||||
let chst = ChannelState {
|
||||
ioc_address: None,
|
||||
connection: ConnectionState::Unreachable,
|
||||
archive_settings: ArchiveSettings::dummy(),
|
||||
archiving_configuration: st1.config,
|
||||
};
|
||||
states.channels.insert(k, chst);
|
||||
}
|
||||
@@ -179,7 +165,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
|
||||
let chst = ChannelState {
|
||||
ioc_address: None,
|
||||
connection: ConnectionState::Unreachable,
|
||||
archive_settings: ArchiveSettings::dummy(),
|
||||
archiving_configuration: st1.config,
|
||||
};
|
||||
states.channels.insert(k, chst);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user