Refactor
This commit is contained in:
+68
-330
@@ -1,4 +1,5 @@
|
||||
pub mod conn;
|
||||
pub mod connset;
|
||||
pub mod findioc;
|
||||
pub mod proto;
|
||||
pub mod search;
|
||||
@@ -6,28 +7,27 @@ pub mod store;
|
||||
|
||||
use self::store::DataStore;
|
||||
use crate::ca::conn::ConnCommand;
|
||||
use crate::ca::connset::CaConnSet;
|
||||
use crate::errconv::ErrConv;
|
||||
use crate::linuxhelper::local_hostname;
|
||||
use crate::store::{CommonInsertItemQueue, CommonInsertItemQueueSender, IntoSimplerError, QueryItem};
|
||||
use async_channel::Sender;
|
||||
use conn::CaConn;
|
||||
use crate::metrics::metrics_agg_task;
|
||||
use crate::rt::JoinHandle;
|
||||
use crate::store::{CommonInsertItemQueue, IntoSimplerError, QueryItem};
|
||||
use err::Error;
|
||||
use futures_util::stream::FuturesUnordered;
|
||||
use futures_util::{FutureExt, StreamExt};
|
||||
use log::*;
|
||||
use netpod::{Database, ScyllaConfig};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff};
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
use stats::{CaConnStats, CaConnStatsAgg};
|
||||
use std::collections::BTreeMap;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::fs::OpenOptions;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::sync::Mutex as TokMx;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_postgres::Client as PgClient;
|
||||
|
||||
pub static SIGINT: AtomicU32 = AtomicU32::new(0);
|
||||
@@ -60,6 +60,7 @@ struct ChannelConfig {
|
||||
insert_item_queue_cap: Option<usize>,
|
||||
api_bind: Option<String>,
|
||||
local_epics_hostname: Option<String>,
|
||||
store_workers_rate: Option<u64>,
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -136,6 +137,7 @@ pub async fn parse_config(config: PathBuf) -> Result<CaConnectOpts, Error> {
|
||||
insert_item_queue_cap: conf.insert_item_queue_cap.unwrap_or(200000),
|
||||
api_bind: conf.api_bind.unwrap_or_else(|| "0.0.0.0:3011".into()),
|
||||
local_epics_hostname: conf.local_epics_hostname.unwrap_or_else(local_hostname),
|
||||
store_workers_rate: conf.store_workers_rate.unwrap_or(10000),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -156,6 +158,7 @@ pub struct CaConnectOpts {
|
||||
pub insert_item_queue_cap: usize,
|
||||
pub api_bind: String,
|
||||
pub local_epics_hostname: String,
|
||||
pub store_workers_rate: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
@@ -206,6 +209,7 @@ async fn spawn_scylla_insert_workers(
|
||||
insert_worker_count: usize,
|
||||
insert_item_queue: Arc<CommonInsertItemQueue>,
|
||||
insert_frac: Arc<AtomicU64>,
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
pg_client: Arc<PgClient>,
|
||||
store_stats: Arc<stats::CaConnStats>,
|
||||
) -> Result<Vec<JoinHandle<()>>, Error> {
|
||||
@@ -220,17 +224,34 @@ async fn spawn_scylla_insert_workers(
|
||||
let stats = store_stats.clone();
|
||||
let recv = insert_item_queue.receiver();
|
||||
let insert_frac = insert_frac.clone();
|
||||
let ingest_commons = ingest_commons.clone();
|
||||
let fut = async move {
|
||||
let backoff_0 = Duration::from_millis(10);
|
||||
let mut backoff = backoff_0.clone();
|
||||
let mut i1 = 0;
|
||||
while let Ok(item) = recv.recv().await {
|
||||
let mut ts_recv_last = Instant::now();
|
||||
loop {
|
||||
let tsnow = Instant::now();
|
||||
let dt = tsnow.duration_since(ts_recv_last);
|
||||
let dt_min = {
|
||||
let rate = ingest_commons.store_workers_rate.load(Ordering::Acquire);
|
||||
Duration::from_nanos(insert_worker_count as u64 * 1000000000 / rate)
|
||||
};
|
||||
if dt < dt_min {
|
||||
tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await;
|
||||
}
|
||||
let item = if let Ok(item) = recv.recv().await {
|
||||
item
|
||||
} else {
|
||||
break;
|
||||
};
|
||||
ts_recv_last = Instant::now();
|
||||
stats.store_worker_item_recv_inc();
|
||||
match item {
|
||||
QueryItem::ConnectionStatus(item) => {
|
||||
match crate::store::insert_connection_status(item, &data_store, &stats).await {
|
||||
Ok(_) => {
|
||||
stats.store_worker_insert_done_inc();
|
||||
stats.connection_status_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -242,7 +263,7 @@ async fn spawn_scylla_insert_workers(
|
||||
QueryItem::ChannelStatus(item) => {
|
||||
match crate::store::insert_channel_status(item, &data_store, &stats).await {
|
||||
Ok(_) => {
|
||||
stats.store_worker_insert_done_inc();
|
||||
stats.channel_status_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -280,7 +301,7 @@ async fn spawn_scylla_insert_workers(
|
||||
let qres = data_store.scy.execute(&data_store.qu_insert_muted, values).await;
|
||||
match qres {
|
||||
Ok(_) => {
|
||||
stats.store_worker_insert_done_inc();
|
||||
stats.mute_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -304,7 +325,7 @@ async fn spawn_scylla_insert_workers(
|
||||
.await;
|
||||
match qres {
|
||||
Ok(_) => {
|
||||
stats.store_worker_insert_done_inc();
|
||||
stats.ivl_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -316,6 +337,7 @@ async fn spawn_scylla_insert_workers(
|
||||
}
|
||||
QueryItem::ChannelInfo(item) => {
|
||||
let params = (
|
||||
(item.series.id() & 0xff) as i32,
|
||||
item.ts_msp as i32,
|
||||
item.series.id() as i64,
|
||||
item.ivl,
|
||||
@@ -325,7 +347,7 @@ async fn spawn_scylla_insert_workers(
|
||||
let qres = data_store.scy.execute(&data_store.qu_insert_channel_ping, params).await;
|
||||
match qres {
|
||||
Ok(_) => {
|
||||
stats.store_worker_insert_done_inc();
|
||||
stats.channel_info_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -345,273 +367,6 @@ async fn spawn_scylla_insert_workers(
|
||||
Ok(jhs)
|
||||
}
|
||||
|
||||
pub struct CommandQueueSet {
|
||||
queues: tokio::sync::Mutex<BTreeMap<SocketAddrV4, Sender<ConnCommand>>>,
|
||||
}
|
||||
|
||||
impl CommandQueueSet {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
queues: tokio::sync::Mutex::new(BTreeMap::<SocketAddrV4, Sender<ConnCommand>>::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn queues(&self) -> &tokio::sync::Mutex<BTreeMap<SocketAddrV4, Sender<ConnCommand>>> {
|
||||
&self.queues
|
||||
}
|
||||
|
||||
pub async fn queues_locked(&self) -> tokio::sync::MutexGuard<BTreeMap<SocketAddrV4, Sender<ConnCommand>>> {
|
||||
let mut g = self.queues.lock().await;
|
||||
let mut rm = Vec::new();
|
||||
for (k, v) in g.iter() {
|
||||
if v.is_closed() {
|
||||
rm.push(*k);
|
||||
}
|
||||
}
|
||||
for x in rm {
|
||||
g.remove(&x);
|
||||
}
|
||||
g
|
||||
}
|
||||
}
|
||||
|
||||
struct CaConnRess {
|
||||
sender: Sender<ConnCommand>,
|
||||
stats: Arc<CaConnStats>,
|
||||
jh: JoinHandle<Result<(), Error>>,
|
||||
}
|
||||
|
||||
// TODO
|
||||
// Resources belonging to the same CaConn also belong together here.
|
||||
// Only add or remove them from the set at once.
|
||||
// That means, they should go together.
|
||||
// Does not hold the actual CaConn, because that struct is in a task.
|
||||
// Always create the CaConn via a common code path which also takes care
|
||||
// to add it to the correct list.
|
||||
// There, make spawning part of this function?
|
||||
pub struct CaConnSet {
|
||||
ca_conn_ress: TokMx<BTreeMap<SocketAddr, CaConnRess>>,
|
||||
}
|
||||
|
||||
impl CaConnSet {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
ca_conn_ress: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create_ca_conn(
|
||||
&self,
|
||||
addr: SocketAddrV4,
|
||||
local_epics_hostname: String,
|
||||
array_truncate: usize,
|
||||
insert_queue_max: usize,
|
||||
insert_item_queue_sender: CommonInsertItemQueueSender,
|
||||
data_store: Arc<DataStore>,
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
with_channels: Vec<String>,
|
||||
) -> Result<(), Error> {
|
||||
info!("create new CaConn {:?}", addr);
|
||||
let addr2 = SocketAddr::V4(addr.clone());
|
||||
let mut conn = CaConn::new(
|
||||
addr,
|
||||
local_epics_hostname,
|
||||
data_store.clone(),
|
||||
insert_item_queue_sender,
|
||||
array_truncate,
|
||||
insert_queue_max,
|
||||
ingest_commons,
|
||||
);
|
||||
for ch in with_channels {
|
||||
conn.channel_add(ch);
|
||||
}
|
||||
let conn = conn;
|
||||
let conn_tx = conn.conn_command_tx();
|
||||
let conn_stats = conn.stats();
|
||||
let conn_fut = async move {
|
||||
let stats = conn.stats();
|
||||
let mut conn = conn;
|
||||
while let Some(item) = conn.next().await {
|
||||
match item {
|
||||
Ok(_) => {
|
||||
stats.conn_item_count_inc();
|
||||
}
|
||||
Err(e) => {
|
||||
error!("CaConn gives error: {e:?}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok::<_, Error>(())
|
||||
};
|
||||
let jh = tokio::spawn(conn_fut);
|
||||
let ca_conn_ress = CaConnRess {
|
||||
sender: conn_tx,
|
||||
stats: conn_stats,
|
||||
jh,
|
||||
};
|
||||
self.ca_conn_ress.lock().await.insert(addr2, ca_conn_ress);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn send_command_to_all<F, R>(&self, cmdgen: F) -> Result<Vec<R>, Error>
|
||||
where
|
||||
F: Fn() -> (ConnCommand, async_channel::Receiver<R>),
|
||||
{
|
||||
//let it = self.ca_conn_ress.iter().map(|x| x);
|
||||
//Self::send_command_inner(it, move || cmd.clone());
|
||||
let mut rxs = Vec::new();
|
||||
for (_addr, ress) in &*self.ca_conn_ress.lock().await {
|
||||
let (cmd, rx) = cmdgen();
|
||||
match ress.sender.send(cmd).await {
|
||||
Ok(()) => {
|
||||
rxs.push(rx);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("can not send command {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut res = Vec::new();
|
||||
for rx in rxs {
|
||||
let x = rx.recv().await?;
|
||||
res.push(x);
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
pub async fn send_command_to_addr<F, R>(&self, addr: &SocketAddr, cmdgen: F) -> Result<R, Error>
|
||||
where
|
||||
F: Fn() -> (ConnCommand, async_channel::Receiver<R>),
|
||||
{
|
||||
if let Some(ress) = self.ca_conn_ress.lock().await.get(addr) {
|
||||
let (cmd, rx) = cmdgen();
|
||||
ress.sender.send(cmd).await.err_conv()?;
|
||||
let ret = rx.recv().await.err_conv()?;
|
||||
Ok(ret)
|
||||
} else {
|
||||
Err(Error::with_msg_no_trace(format!("addr not found")))
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
async fn send_command_inner<'a, IT, F, R>(it: &mut IT, cmdgen: F) -> Vec<async_channel::Receiver<R>>
|
||||
where
|
||||
IT: Iterator<Item = (&'a SocketAddrV4, &'a async_channel::Sender<ConnCommand>)>,
|
||||
F: Fn() -> (ConnCommand, async_channel::Receiver<R>),
|
||||
{
|
||||
let mut rxs = Vec::new();
|
||||
for (_, tx) in it {
|
||||
let (cmd, rx) = cmdgen();
|
||||
match tx.send(cmd).await {
|
||||
Ok(()) => {
|
||||
rxs.push(rx);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("can not send command {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
rxs
|
||||
}
|
||||
|
||||
pub async fn send_stop(&self) -> Result<(), Error> {
|
||||
self.send_command_to_all(|| ConnCommand::shutdown()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn wait_stopped(&self) -> Result<(), Error> {
|
||||
let mut g = self.ca_conn_ress.lock().await;
|
||||
let mm = std::mem::replace(&mut *g, BTreeMap::new());
|
||||
let mut jhs: VecDeque<_> = VecDeque::new();
|
||||
for t in mm {
|
||||
jhs.push_back(t.1.jh.fuse());
|
||||
}
|
||||
loop {
|
||||
let mut jh = if let Some(x) = jhs.pop_front() {
|
||||
x
|
||||
} else {
|
||||
break;
|
||||
};
|
||||
futures_util::select! {
|
||||
a = jh => match a {
|
||||
Ok(k) => match k {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("{e:?}");
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
error!("{e:?}");
|
||||
}
|
||||
},
|
||||
_b = tokio::time::sleep(Duration::from_millis(1000)).fuse() => {
|
||||
jhs.push_back(jh);
|
||||
info!("waiting for {} connections", jhs.len());
|
||||
}
|
||||
};
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn add_channel_to_addr(
|
||||
&self,
|
||||
addr: SocketAddr,
|
||||
channel_name: String,
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
) -> Result<(), Error> {
|
||||
let g = self.ca_conn_ress.lock().await;
|
||||
match g.get(&addr) {
|
||||
Some(ca_conn) => {
|
||||
//info!("try to add to existing... {addr} {channel_name}");
|
||||
let (cmd, rx) = ConnCommand::channel_add(channel_name);
|
||||
ca_conn.sender.send(cmd).await.err_conv()?;
|
||||
let a = rx.recv().await.err_conv()?;
|
||||
if a {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Error::with_msg_no_trace(format!("channel add failed")))
|
||||
}
|
||||
}
|
||||
None => {
|
||||
//info!("create new {addr} {channel_name}");
|
||||
drop(g);
|
||||
let addr = if let SocketAddr::V4(x) = addr {
|
||||
x
|
||||
} else {
|
||||
return Err(Error::with_msg_no_trace(format!("only ipv4 supported for IOC")));
|
||||
};
|
||||
// TODO use parameters:
|
||||
self.create_ca_conn(
|
||||
addr,
|
||||
ingest_commons.local_epics_hostname.clone(),
|
||||
512,
|
||||
200,
|
||||
ingest_commons.insert_item_queue.sender().await,
|
||||
ingest_commons.data_store.clone(),
|
||||
ingest_commons.clone(),
|
||||
vec![channel_name],
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn has_addr(&self, addr: &SocketAddr) -> bool {
|
||||
// TODO only used to check on add-channel whether we want to add channel to conn, or create new conn.
|
||||
// TODO must do that atomic.
|
||||
self.ca_conn_ress.lock().await.contains_key(addr)
|
||||
}
|
||||
|
||||
pub async fn addr_nth_mod(&self, n: usize) -> Option<SocketAddr> {
|
||||
let g = self.ca_conn_ress.lock().await;
|
||||
let u = g.len();
|
||||
let n = n % u;
|
||||
g.keys().take(n).last().map(Clone::clone)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct IngestCommons {
|
||||
pub pgconf: Arc<Database>,
|
||||
pub local_epics_hostname: String,
|
||||
@@ -620,6 +375,7 @@ pub struct IngestCommons {
|
||||
pub insert_frac: Arc<AtomicU64>,
|
||||
pub insert_ivl_min: Arc<AtomicU64>,
|
||||
pub extra_inserts_conf: Mutex<ExtraInsertsConf>,
|
||||
pub store_workers_rate: AtomicU64,
|
||||
pub ca_conn_set: CaConnSet,
|
||||
}
|
||||
|
||||
@@ -714,33 +470,6 @@ async fn query_addr_multiple(pg_client: &PgClient) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn metrics_agg_task(
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
local_stats: Arc<CaConnStats>,
|
||||
store_stats: Arc<CaConnStats>,
|
||||
) -> Result<(), Error> {
|
||||
let mut agg_last = CaConnStatsAgg::new();
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_millis(671)).await;
|
||||
let agg = CaConnStatsAgg::new();
|
||||
agg.push(&local_stats);
|
||||
agg.push(&store_stats);
|
||||
{
|
||||
let conn_stats_guard = ingest_commons.ca_conn_set.ca_conn_ress.lock().await;
|
||||
for (_, g) in conn_stats_guard.iter() {
|
||||
agg.push(&g.stats);
|
||||
}
|
||||
}
|
||||
let mut m = METRICS.lock().unwrap();
|
||||
*m = Some(agg.clone());
|
||||
if false {
|
||||
let diff = CaConnStatsAggDiff::diff_from(&agg_last, &agg);
|
||||
info!("{}", diff.display());
|
||||
}
|
||||
agg_last = agg;
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
crate::linuxhelper::set_signal_handler()?;
|
||||
let insert_frac = Arc::new(AtomicU64::new(1000));
|
||||
@@ -791,6 +520,20 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
let data_store = Arc::new(DataStore::new(&scyconf, pg_client.clone()).await?);
|
||||
let insert_item_queue = CommonInsertItemQueue::new(opts.insert_item_queue_cap);
|
||||
let insert_item_queue = Arc::new(insert_item_queue);
|
||||
|
||||
let ingest_commons = IngestCommons {
|
||||
pgconf: Arc::new(pgconf.clone()),
|
||||
local_epics_hostname: opts.local_epics_hostname.clone(),
|
||||
insert_item_queue: insert_item_queue.clone(),
|
||||
data_store: data_store.clone(),
|
||||
insert_ivl_min: insert_ivl_min.clone(),
|
||||
insert_frac: insert_frac.clone(),
|
||||
extra_inserts_conf,
|
||||
store_workers_rate: AtomicU64::new(opts.store_workers_rate),
|
||||
ca_conn_set: CaConnSet::new(),
|
||||
};
|
||||
let ingest_commons = Arc::new(ingest_commons);
|
||||
|
||||
// TODO use a new stats type:
|
||||
let store_stats = Arc::new(CaConnStats::new());
|
||||
let jh_insert_workers = spawn_scylla_insert_workers(
|
||||
@@ -799,23 +542,12 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
opts.insert_worker_count,
|
||||
insert_item_queue.clone(),
|
||||
insert_frac.clone(),
|
||||
ingest_commons.clone(),
|
||||
pg_client.clone(),
|
||||
store_stats.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let ingest_commons = IngestCommons {
|
||||
pgconf: Arc::new(pgconf.clone()),
|
||||
local_epics_hostname: opts.local_epics_hostname.clone(),
|
||||
insert_item_queue: insert_item_queue.clone(),
|
||||
data_store: data_store.clone(),
|
||||
insert_ivl_min: insert_ivl_min.clone(),
|
||||
insert_frac,
|
||||
extra_inserts_conf,
|
||||
ca_conn_set: CaConnSet::new(),
|
||||
};
|
||||
let ingest_commons = Arc::new(ingest_commons);
|
||||
|
||||
if true {
|
||||
tokio::spawn(crate::metrics::start_metrics_service(
|
||||
opts.api_bind.clone(),
|
||||
@@ -858,18 +590,24 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
if SIGINT.load(Ordering::Acquire) != 0 {
|
||||
break;
|
||||
}
|
||||
let addr = ingest_commons.ca_conn_set.addr_nth_mod(iper).await;
|
||||
if let Some(addr) = addr {
|
||||
fn cmdgen() -> (ConnCommand, async_channel::Receiver<bool>) {
|
||||
ConnCommand::check_channels_alive()
|
||||
// TODO remove magic number, make adaptive:
|
||||
if ingest_commons.insert_item_queue.receiver().len() < 10000 {
|
||||
let addr = ingest_commons.ca_conn_set.addr_nth_mod(iper).await;
|
||||
if let Some(addr) = addr {
|
||||
//info!("channel info for addr {addr}");
|
||||
fn cmdgen() -> (ConnCommand, async_channel::Receiver<bool>) {
|
||||
ConnCommand::check_channels_alive()
|
||||
}
|
||||
// TODO race between getting nth address and command send, so ignore error so far.
|
||||
let _res = ingest_commons.ca_conn_set.send_command_to_addr(&addr, cmdgen).await;
|
||||
let cmdgen = || ConnCommand::save_conn_info();
|
||||
// TODO race between getting nth address and command send, so ignore error so far.
|
||||
let _res = ingest_commons.ca_conn_set.send_command_to_addr(&addr, cmdgen).await;
|
||||
} else {
|
||||
//info!("nothing to save iper {iper}");
|
||||
}
|
||||
// TODO race between getting nth address and command send, so ignore error so far.
|
||||
let _res = ingest_commons.ca_conn_set.send_command_to_addr(&addr, cmdgen).await;
|
||||
let cmdgen = || ConnCommand::save_conn_info();
|
||||
// TODO race between getting nth address and command send, so ignore error so far.
|
||||
let _res = ingest_commons.ca_conn_set.send_command_to_addr(&addr, cmdgen).await;
|
||||
iper += 1;
|
||||
}
|
||||
iper += 1;
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
|
||||
|
||||
@@ -249,7 +249,7 @@ impl SubidStore {
|
||||
|
||||
fn info_store_msp_from_time(ts: SystemTime) -> u32 {
|
||||
let dt = ts.duration_since(SystemTime::UNIX_EPOCH).unwrap_or(Duration::ZERO);
|
||||
(dt.as_secs() / MIN * MIN / SEC) as u32
|
||||
(dt.as_secs() / 60 * 60) as u32
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -635,7 +635,7 @@ impl CaConn {
|
||||
self.insert_item_queue.push_back(item);
|
||||
} else {
|
||||
if warn_max < 10 {
|
||||
warn!("no series for cid {:?}", st.cid);
|
||||
debug!("no series for cid {:?}", st.cid);
|
||||
warn_max += 1;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,293 @@
|
||||
use super::conn::ConnCommand;
|
||||
use super::store::DataStore;
|
||||
use super::IngestCommons;
|
||||
use crate::ca::conn::CaConn;
|
||||
use crate::errconv::ErrConv;
|
||||
use crate::rt::{JoinHandle, TokMx};
|
||||
use crate::store::CommonInsertItemQueueSender;
|
||||
use async_channel::Sender;
|
||||
use err::Error;
|
||||
use futures_util::{FutureExt, StreamExt};
|
||||
use netpod::log::*;
|
||||
use stats::CaConnStats;
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
use std::net::{SocketAddr, SocketAddrV4};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
pub struct CommandQueueSet {
|
||||
queues: TokMx<BTreeMap<SocketAddrV4, Sender<ConnCommand>>>,
|
||||
}
|
||||
|
||||
impl CommandQueueSet {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
queues: TokMx::new(BTreeMap::<SocketAddrV4, Sender<ConnCommand>>::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn queues(&self) -> &TokMx<BTreeMap<SocketAddrV4, Sender<ConnCommand>>> {
|
||||
&self.queues
|
||||
}
|
||||
|
||||
pub async fn queues_locked(&self) -> tokio::sync::MutexGuard<BTreeMap<SocketAddrV4, Sender<ConnCommand>>> {
|
||||
let mut g = self.queues.lock().await;
|
||||
let mut rm = Vec::new();
|
||||
for (k, v) in g.iter() {
|
||||
if v.is_closed() {
|
||||
rm.push(*k);
|
||||
}
|
||||
}
|
||||
for x in rm {
|
||||
g.remove(&x);
|
||||
}
|
||||
g
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CaConnRess {
|
||||
sender: Sender<ConnCommand>,
|
||||
stats: Arc<CaConnStats>,
|
||||
jh: JoinHandle<Result<(), Error>>,
|
||||
}
|
||||
|
||||
impl CaConnRess {
|
||||
pub fn stats(&self) -> &Arc<CaConnStats> {
|
||||
&self.stats
|
||||
}
|
||||
}
|
||||
|
||||
// TODO
|
||||
// Resources belonging to the same CaConn also belong together here.
|
||||
// Only add or remove them from the set at once.
|
||||
// That means, they should go together.
|
||||
// Does not hold the actual CaConn, because that struct is in a task.
|
||||
// Always create the CaConn via a common code path which also takes care
|
||||
// to add it to the correct list.
|
||||
// There, make spawning part of this function?
|
||||
pub struct CaConnSet {
|
||||
ca_conn_ress: TokMx<BTreeMap<SocketAddr, CaConnRess>>,
|
||||
}
|
||||
|
||||
impl CaConnSet {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
ca_conn_ress: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ca_conn_ress(&self) -> &TokMx<BTreeMap<SocketAddr, CaConnRess>> {
|
||||
&self.ca_conn_ress
|
||||
}
|
||||
|
||||
pub async fn create_ca_conn(
|
||||
&self,
|
||||
addr: SocketAddrV4,
|
||||
local_epics_hostname: String,
|
||||
array_truncate: usize,
|
||||
insert_queue_max: usize,
|
||||
insert_item_queue_sender: CommonInsertItemQueueSender,
|
||||
data_store: Arc<DataStore>,
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
with_channels: Vec<String>,
|
||||
) -> Result<(), Error> {
|
||||
info!("create new CaConn {:?}", addr);
|
||||
let addr2 = SocketAddr::V4(addr.clone());
|
||||
let mut conn = CaConn::new(
|
||||
addr,
|
||||
local_epics_hostname,
|
||||
data_store.clone(),
|
||||
insert_item_queue_sender,
|
||||
array_truncate,
|
||||
insert_queue_max,
|
||||
ingest_commons,
|
||||
);
|
||||
for ch in with_channels {
|
||||
conn.channel_add(ch);
|
||||
}
|
||||
let conn = conn;
|
||||
let conn_tx = conn.conn_command_tx();
|
||||
let conn_stats = conn.stats();
|
||||
let conn_fut = async move {
|
||||
let stats = conn.stats();
|
||||
let mut conn = conn;
|
||||
while let Some(item) = conn.next().await {
|
||||
match item {
|
||||
Ok(_) => {
|
||||
stats.conn_item_count_inc();
|
||||
}
|
||||
Err(e) => {
|
||||
error!("CaConn gives error: {e:?}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok::<_, Error>(())
|
||||
};
|
||||
let jh = tokio::spawn(conn_fut);
|
||||
let ca_conn_ress = CaConnRess {
|
||||
sender: conn_tx,
|
||||
stats: conn_stats,
|
||||
jh,
|
||||
};
|
||||
self.ca_conn_ress.lock().await.insert(addr2, ca_conn_ress);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn send_command_to_all<F, R>(&self, cmdgen: F) -> Result<Vec<R>, Error>
|
||||
where
|
||||
F: Fn() -> (ConnCommand, async_channel::Receiver<R>),
|
||||
{
|
||||
//let it = self.ca_conn_ress.iter().map(|x| x);
|
||||
//Self::send_command_inner(it, move || cmd.clone());
|
||||
let mut rxs = Vec::new();
|
||||
for (_addr, ress) in &*self.ca_conn_ress.lock().await {
|
||||
let (cmd, rx) = cmdgen();
|
||||
match ress.sender.send(cmd).await {
|
||||
Ok(()) => {
|
||||
rxs.push(rx);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("can not send command {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut res = Vec::new();
|
||||
for rx in rxs {
|
||||
let x = rx.recv().await?;
|
||||
res.push(x);
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
pub async fn send_command_to_addr<F, R>(&self, addr: &SocketAddr, cmdgen: F) -> Result<R, Error>
|
||||
where
|
||||
F: Fn() -> (ConnCommand, async_channel::Receiver<R>),
|
||||
{
|
||||
if let Some(ress) = self.ca_conn_ress.lock().await.get(addr) {
|
||||
let (cmd, rx) = cmdgen();
|
||||
ress.sender.send(cmd).await.err_conv()?;
|
||||
let ret = rx.recv().await.err_conv()?;
|
||||
Ok(ret)
|
||||
} else {
|
||||
Err(Error::with_msg_no_trace(format!("addr not found")))
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
async fn send_command_inner<'a, IT, F, R>(it: &mut IT, cmdgen: F) -> Vec<async_channel::Receiver<R>>
|
||||
where
|
||||
IT: Iterator<Item = (&'a SocketAddrV4, &'a async_channel::Sender<ConnCommand>)>,
|
||||
F: Fn() -> (ConnCommand, async_channel::Receiver<R>),
|
||||
{
|
||||
let mut rxs = Vec::new();
|
||||
for (_, tx) in it {
|
||||
let (cmd, rx) = cmdgen();
|
||||
match tx.send(cmd).await {
|
||||
Ok(()) => {
|
||||
rxs.push(rx);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("can not send command {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
rxs
|
||||
}
|
||||
|
||||
pub async fn send_stop(&self) -> Result<(), Error> {
|
||||
self.send_command_to_all(|| ConnCommand::shutdown()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn wait_stopped(&self) -> Result<(), Error> {
|
||||
let mut g = self.ca_conn_ress.lock().await;
|
||||
let mm = std::mem::replace(&mut *g, BTreeMap::new());
|
||||
let mut jhs: VecDeque<_> = VecDeque::new();
|
||||
for t in mm {
|
||||
jhs.push_back(t.1.jh.fuse());
|
||||
}
|
||||
loop {
|
||||
let mut jh = if let Some(x) = jhs.pop_front() {
|
||||
x
|
||||
} else {
|
||||
break;
|
||||
};
|
||||
futures_util::select! {
|
||||
a = jh => match a {
|
||||
Ok(k) => match k {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("{e:?}");
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
error!("{e:?}");
|
||||
}
|
||||
},
|
||||
_b = crate::rt::sleep(Duration::from_millis(1000)).fuse() => {
|
||||
jhs.push_back(jh);
|
||||
info!("waiting for {} connections", jhs.len());
|
||||
}
|
||||
};
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn add_channel_to_addr(
|
||||
&self,
|
||||
addr: SocketAddr,
|
||||
channel_name: String,
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
) -> Result<(), Error> {
|
||||
let g = self.ca_conn_ress.lock().await;
|
||||
match g.get(&addr) {
|
||||
Some(ca_conn) => {
|
||||
//info!("try to add to existing... {addr} {channel_name}");
|
||||
let (cmd, rx) = ConnCommand::channel_add(channel_name);
|
||||
ca_conn.sender.send(cmd).await.err_conv()?;
|
||||
let a = rx.recv().await.err_conv()?;
|
||||
if a {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Error::with_msg_no_trace(format!("channel add failed")))
|
||||
}
|
||||
}
|
||||
None => {
|
||||
//info!("create new {addr} {channel_name}");
|
||||
drop(g);
|
||||
let addr = if let SocketAddr::V4(x) = addr {
|
||||
x
|
||||
} else {
|
||||
return Err(Error::with_msg_no_trace(format!("only ipv4 supported for IOC")));
|
||||
};
|
||||
// TODO use parameters:
|
||||
self.create_ca_conn(
|
||||
addr,
|
||||
ingest_commons.local_epics_hostname.clone(),
|
||||
512,
|
||||
200,
|
||||
ingest_commons.insert_item_queue.sender().await,
|
||||
ingest_commons.data_store.clone(),
|
||||
ingest_commons.clone(),
|
||||
vec![channel_name],
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn has_addr(&self, addr: &SocketAddr) -> bool {
|
||||
// TODO only used to check on add-channel whether we want to add channel to conn, or create new conn.
|
||||
// TODO must do that atomic.
|
||||
self.ca_conn_ress.lock().await.contains_key(addr)
|
||||
}
|
||||
|
||||
pub async fn addr_nth_mod(&self, n: usize) -> Option<SocketAddr> {
|
||||
let g = self.ca_conn_ress.lock().await;
|
||||
let u = g.len();
|
||||
let n = n % u;
|
||||
g.keys().take(n).last().map(Clone::clone)
|
||||
}
|
||||
}
|
||||
@@ -171,7 +171,7 @@ impl DataStore {
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
|
||||
let qu_insert_channel_status_by_ts_msp = Arc::new(q);
|
||||
let q = scy
|
||||
.prepare("insert into channel_ping (ts_msp, series, ivl, interest, evsize) values (?, ?, ?, ?, ?)")
|
||||
.prepare("insert into channel_ping (part, ts_msp, series, ivl, interest, evsize) values (?, ?, ?, ?, ?, ?)")
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
|
||||
let qu_insert_channel_ping = Arc::new(q);
|
||||
|
||||
+34
-22
@@ -1,39 +1,20 @@
|
||||
use crate::ca::conn::ConnCommand;
|
||||
use crate::ca::{ExtraInsertsConf, IngestCommons};
|
||||
use crate::ca::{ExtraInsertsConf, IngestCommons, METRICS};
|
||||
use axum::extract::Query;
|
||||
use err::Error;
|
||||
use http::request::Parts;
|
||||
use log::*;
|
||||
use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff};
|
||||
use std::collections::HashMap;
|
||||
use std::net::{SocketAddr, SocketAddrV4};
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
async fn get_empty() -> String {
|
||||
String::new()
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
async fn send_command<'a, IT, F, R>(it: &mut IT, cmdgen: F) -> Vec<async_channel::Receiver<R>>
|
||||
where
|
||||
IT: Iterator<Item = (&'a SocketAddrV4, &'a async_channel::Sender<ConnCommand>)>,
|
||||
F: Fn() -> (ConnCommand, async_channel::Receiver<R>),
|
||||
{
|
||||
let mut rxs = Vec::new();
|
||||
for (_, tx) in it {
|
||||
let (cmd, rx) = cmdgen();
|
||||
match tx.send(cmd).await {
|
||||
Ok(()) => {
|
||||
rxs.push(rx);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("can not send command {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
rxs
|
||||
}
|
||||
|
||||
async fn find_channel(
|
||||
params: HashMap<String, String>,
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
@@ -276,3 +257,34 @@ pub async fn start_metrics_service(bind_to: String, ingest_commons: Arc<IngestCo
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub async fn metrics_agg_task(
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
local_stats: Arc<CaConnStats>,
|
||||
store_stats: Arc<CaConnStats>,
|
||||
) -> Result<(), Error> {
|
||||
let mut agg_last = CaConnStatsAgg::new();
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_millis(671)).await;
|
||||
let agg = CaConnStatsAgg::new();
|
||||
agg.push(&local_stats);
|
||||
agg.push(&store_stats);
|
||||
{
|
||||
let conn_stats_guard = ingest_commons.ca_conn_set.ca_conn_ress().lock().await;
|
||||
for (_, g) in conn_stats_guard.iter() {
|
||||
agg.push(g.stats());
|
||||
}
|
||||
}
|
||||
{
|
||||
let val = ingest_commons.insert_item_queue.receiver().len() as u64;
|
||||
agg.store_worker_recv_queue_len.store(val, Ordering::Release);
|
||||
}
|
||||
let mut m = METRICS.lock().unwrap();
|
||||
*m = Some(agg.clone());
|
||||
if false {
|
||||
let diff = CaConnStatsAggDiff::diff_from(&agg_last, &agg);
|
||||
info!("{}", diff.display());
|
||||
}
|
||||
agg_last = agg;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ pub mod errconv;
|
||||
pub mod linuxhelper;
|
||||
pub mod metrics;
|
||||
pub mod netbuf;
|
||||
pub mod rt;
|
||||
pub mod series;
|
||||
pub mod store;
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -0,0 +1,3 @@
|
||||
pub use tokio::sync::Mutex as TokMx;
|
||||
pub use tokio::task::JoinHandle;
|
||||
pub use tokio::time::sleep;
|
||||
@@ -441,6 +441,9 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> {
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
|
||||
let scy = Arc::new(scy);
|
||||
|
||||
error!("TODO redo the pulse mapping");
|
||||
err::todo();
|
||||
let qu1 = scy
|
||||
.prepare("insert into pulse_pkey (a, pulse_a) values (?, ?)")
|
||||
.await
|
||||
|
||||
@@ -152,6 +152,7 @@ stats_proc::stats_struct!((
|
||||
inserts_queue_push,
|
||||
inserts_queue_drop,
|
||||
channel_fast_item_drop,
|
||||
store_worker_recv_queue_len,
|
||||
store_worker_item_recv,
|
||||
// TODO rename to make clear that this drop is voluntary because of user config choice:
|
||||
store_worker_item_drop,
|
||||
@@ -160,6 +161,11 @@ stats_proc::stats_struct!((
|
||||
store_worker_insert_timeout,
|
||||
store_worker_insert_unavailable,
|
||||
store_worker_insert_error,
|
||||
connection_status_insert_done,
|
||||
channel_status_insert_done,
|
||||
channel_info_insert_done,
|
||||
ivl_insert_done,
|
||||
mute_insert_done,
|
||||
caconn_poll_count,
|
||||
caconn_loop1_count,
|
||||
caconn_loop2_count,
|
||||
|
||||
Reference in New Issue
Block a user