Factor out the connection collection

This commit is contained in:
Dominik Werder
2022-09-23 16:43:17 +02:00
parent b64ad0f81e
commit 7a0b234573
4 changed files with 430 additions and 372 deletions

View File

@@ -8,11 +8,10 @@ use self::store::DataStore;
use crate::ca::conn::ConnCommand;
use crate::errconv::ErrConv;
use crate::linuxhelper::local_hostname;
use crate::store::{CommonInsertItemQueue, QueryItem};
use crate::store::{CommonInsertItemQueue, CommonInsertItemQueueSender, QueryItem};
use async_channel::Sender;
use conn::CaConn;
use err::Error;
use futures_util::future::Fuse;
use futures_util::stream::FuturesUnordered;
use futures_util::{FutureExt, StreamExt};
use log::*;
@@ -20,7 +19,7 @@ use netpod::{Database, ScyllaConfig};
use serde::{Deserialize, Serialize};
use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff};
use std::collections::{BTreeMap, VecDeque};
use std::net::{IpAddr, Ipv4Addr, SocketAddrV4};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
use std::path::PathBuf;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
@@ -339,6 +338,241 @@ impl CommandQueueSet {
}
}
struct CaConnRess {
sender: Sender<ConnCommand>,
stats: Arc<CaConnStats>,
jh: JoinHandle<Result<(), Error>>,
}
// TODO
// Resources belonging to the same CaConn also belong together here.
// Only add or remove them from the set at once.
// That means, they should go together.
// Does not hold the actual CaConn, because that struct is in a task.
// Always create the CaConn via a common code path which also takes care
// to add it to the correct list.
// There, make spawning part of this function?
pub struct CaConnSet {
ca_conn_ress: TokMx<BTreeMap<SocketAddr, CaConnRess>>,
}
impl CaConnSet {
pub fn new() -> Self {
Self {
ca_conn_ress: Default::default(),
}
}
pub async fn create_ca_conn(
&self,
addr: SocketAddrV4,
local_epics_hostname: String,
array_truncate: usize,
insert_queue_max: usize,
insert_item_queue_sender: CommonInsertItemQueueSender,
data_store: Arc<DataStore>,
ingest_commons: Arc<IngestCommons>,
with_channels: Vec<String>,
) -> Result<(), Error> {
info!("create new CaConn {:?}", addr);
let addr2 = SocketAddr::V4(addr.clone());
let mut conn = CaConn::new(
addr,
local_epics_hostname,
data_store.clone(),
insert_item_queue_sender,
array_truncate,
insert_queue_max,
ingest_commons,
);
for ch in with_channels {
conn.channel_add(ch);
}
let conn = conn;
let conn_tx = conn.conn_command_tx();
let conn_stats = conn.stats();
let conn_fut = async move {
let stats = conn.stats();
let mut conn = conn;
while let Some(item) = conn.next().await {
match item {
Ok(_) => {
stats.conn_item_count_inc();
}
Err(e) => {
error!("CaConn gives error: {e:?}");
break;
}
}
}
Ok::<_, Error>(())
};
let jh = tokio::spawn(conn_fut);
let ca_conn_ress = CaConnRess {
sender: conn_tx,
stats: conn_stats,
jh,
};
self.ca_conn_ress.lock().await.insert(addr2, ca_conn_ress);
Ok(())
}
pub async fn send_command_to_all<F, R>(&self, cmdgen: F) -> Result<Vec<R>, Error>
where
F: Fn() -> (ConnCommand, async_channel::Receiver<R>),
{
//let it = self.ca_conn_ress.iter().map(|x| x);
//Self::send_command_inner(it, move || cmd.clone());
let mut rxs = Vec::new();
for (_addr, ress) in &*self.ca_conn_ress.lock().await {
let (cmd, rx) = cmdgen();
match ress.sender.send(cmd).await {
Ok(()) => {
rxs.push(rx);
}
Err(e) => {
error!("can not send command {e:?}");
}
}
}
let mut res = Vec::new();
for rx in rxs {
let x = rx.recv().await?;
res.push(x);
}
Ok(res)
}
pub async fn send_command_to_addr<F, R>(&self, addr: &SocketAddr, cmdgen: F) -> Result<R, Error>
where
F: Fn() -> (ConnCommand, async_channel::Receiver<R>),
{
if let Some(ress) = self.ca_conn_ress.lock().await.get(addr) {
let (cmd, rx) = cmdgen();
ress.sender.send(cmd).await.err_conv()?;
let ret = rx.recv().await.err_conv()?;
Ok(ret)
} else {
Err(Error::with_msg_no_trace(format!("addr not found")))
}
}
#[allow(unused)]
async fn send_command_inner<'a, IT, F, R>(it: &mut IT, cmdgen: F) -> Vec<async_channel::Receiver<R>>
where
IT: Iterator<Item = (&'a SocketAddrV4, &'a async_channel::Sender<ConnCommand>)>,
F: Fn() -> (ConnCommand, async_channel::Receiver<R>),
{
let mut rxs = Vec::new();
for (_, tx) in it {
let (cmd, rx) = cmdgen();
match tx.send(cmd).await {
Ok(()) => {
rxs.push(rx);
}
Err(e) => {
error!("can not send command {e:?}");
}
}
}
rxs
}
pub async fn send_stop(&self) -> Result<(), Error> {
self.send_command_to_all(|| ConnCommand::shutdown()).await?;
Ok(())
}
pub async fn wait_stopped(&self) -> Result<(), Error> {
let mut g = self.ca_conn_ress.lock().await;
let mm = std::mem::replace(&mut *g, BTreeMap::new());
let mut jhs: VecDeque<_> = VecDeque::new();
for t in mm {
jhs.push_back(t.1.jh.fuse());
}
loop {
let mut jh = if let Some(x) = jhs.pop_front() {
x
} else {
break;
};
futures_util::select! {
a = jh => match a {
Ok(k) => match k {
Ok(_) => {}
Err(e) => {
error!("{e:?}");
}
},
Err(e) => {
error!("{e:?}");
}
},
_b = tokio::time::sleep(Duration::from_millis(1000)).fuse() => {
jhs.push_back(jh);
info!("waiting for {} connections", jhs.len());
}
};
}
err::todoval()
}
pub async fn add_channel_to_addr(
&self,
addr: SocketAddr,
channel_name: String,
ingest_commons: Arc<IngestCommons>,
) -> Result<(), Error> {
let g = self.ca_conn_ress.lock().await;
match g.get(&addr) {
Some(ca_conn) => {
let (cmd, rx) = ConnCommand::channel_add(channel_name);
ca_conn.sender.send(cmd).await.err_conv()?;
let a = rx.recv().await.err_conv()?;
if a {
Ok(())
} else {
Err(Error::with_msg_no_trace(format!("channel add failed")))
}
}
None => {
let addr = if let SocketAddr::V4(x) = addr {
x
} else {
return Err(Error::with_msg_no_trace(format!("only ipv4 supported for IOC")));
};
// TODO use parameters:
self.create_ca_conn(
addr,
ingest_commons.local_epics_hostname.clone(),
512,
200,
ingest_commons.insert_item_queue.sender(),
ingest_commons.data_store.clone(),
ingest_commons.clone(),
vec![channel_name],
)
.await?;
Ok(())
}
}
}
pub async fn has_addr(&self, addr: &SocketAddr) -> bool {
// TODO only used to check on add-channel whether we want to add channel to conn, or create new conn.
// TODO must do that atomic.
self.ca_conn_ress.lock().await.contains_key(addr)
}
pub async fn add_channel_or_create_conn() -> Result<(), Error> {
// TODO fix race:
// Must not drop mutex in-between calls.
// Pass mutex on?
Ok(())
}
}
pub struct IngestCommons {
pub pgconf: Arc<Database>,
pub local_epics_hostname: String,
@@ -347,8 +581,7 @@ pub struct IngestCommons {
pub insert_frac: Arc<AtomicU64>,
pub insert_ivl_min: Arc<AtomicU64>,
pub extra_inserts_conf: Mutex<ExtraInsertsConf>,
pub conn_stats: Arc<TokMx<Vec<Arc<CaConnStats>>>>,
pub command_queue_set: Arc<CommandQueueSet>,
pub ca_conn_set: CaConnSet,
}
pub async fn find_channel_addr(
@@ -365,7 +598,10 @@ pub async fn find_channel_addr(
.await
.unwrap();
// TODO allow clean shutdown on ctrl-c and join the pg_conn in the end:
tokio::spawn(pg_conn);
tokio::spawn(async {
pg_conn.await.unwrap();
info!("drop pg conn after find_channel_addr");
});
let pg_client = Arc::new(pg_client);
let qu_find_addr = pg_client
.prepare(
@@ -396,54 +632,74 @@ pub async fn find_channel_addr(
}
}
pub async fn create_ca_conn(
addr: SocketAddrV4,
local_epics_hostname: String,
array_truncate: usize,
insert_queue_max: usize,
insert_item_queue: Arc<CommonInsertItemQueue>,
data_store: Arc<DataStore>,
insert_ivl_min: Arc<AtomicU64>,
conn_stats: Arc<TokMx<Vec<Arc<CaConnStats>>>>,
command_queue_set: Arc<CommandQueueSet>,
ingest_commons: Arc<IngestCommons>,
) -> Result<JoinHandle<Result<(), Error>>, Error> {
info!("create new CaConn {:?}", addr);
let data_store = data_store.clone();
let conn = CaConn::new(
addr,
local_epics_hostname,
data_store.clone(),
insert_item_queue.sender(),
array_truncate,
insert_queue_max,
insert_ivl_min.clone(),
ingest_commons.clone(),
);
conn_stats.lock().await.push(conn.stats());
let stats2 = conn.stats();
let conn_command_tx = conn.conn_command_tx();
{
//command_queue_set.queues().lock().await.insert(addr, conn_command_tx);
command_queue_set.queues_locked().await.insert(addr, conn_command_tx);
#[allow(unused)]
async fn query_addr_multiple(pg_client: &PgClient) -> Result<(), Error> {
let backend: &String = err::todoval();
// TODO factor the find loop into a separate Stream.
let qu_find_addr = pg_client
.prepare("with q1 as (select t1.facility, t1.channel, t1.addr from ioc_by_channel t1 where t1.facility = $1 and t1.channel in ($2, $3, $4, $5, $6, $7, $8, $9) and t1.addr != '' order by t1.tsmod desc) select distinct on (q1.facility, q1.channel) q1.facility, q1.channel, q1.addr from q1")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let mut chns_todo: &[String] = err::todoval();
let mut chstmp = ["__NONE__"; 8];
for (s1, s2) in chns_todo.iter().zip(chstmp.iter_mut()) {
*s2 = s1;
}
let conn_block = async move {
let mut conn = conn;
while let Some(item) = conn.next().await {
match item {
Ok(_) => {
stats2.conn_item_count_inc();
}
chns_todo = &chns_todo[chstmp.len().min(chns_todo.len())..];
let rows = pg_client
.query(
&qu_find_addr,
&[
&backend, &chstmp[0], &chstmp[1], &chstmp[2], &chstmp[3], &chstmp[4], &chstmp[5], &chstmp[6],
&chstmp[7],
],
)
.await
.map_err(|e| Error::with_msg_no_trace(format!("pg lookup error: {e:?}")))?;
for row in rows {
let ch: &str = row.get(1);
let addr: &str = row.get(2);
if addr == "" {
// TODO the address was searched before but could not be found.
} else {
let addr: SocketAddrV4 = match addr.parse() {
Ok(k) => k,
Err(e) => {
error!("CaConn gives error: {e:?}");
break;
error!("can not parse {addr:?} for channel {ch:?} {e:?}");
continue;
}
};
let _ = addr;
}
}
Ok(())
}
async fn metrics_agg_task(
ingest_commons: Arc<IngestCommons>,
local_stats: Arc<CaConnStats>,
store_stats: Arc<CaConnStats>,
) -> Result<(), Error> {
let mut agg_last = CaConnStatsAgg::new();
loop {
tokio::time::sleep(Duration::from_millis(671)).await;
let agg = CaConnStatsAgg::new();
agg.push(&local_stats);
agg.push(&store_stats);
{
let conn_stats_guard = ingest_commons.ca_conn_set.ca_conn_ress.lock().await;
for (_, g) in conn_stats_guard.iter() {
agg.push(&g.stats);
}
}
Ok::<_, Error>(())
};
let jh = tokio::spawn(conn_block);
Ok(jh)
let mut m = METRICS.lock().unwrap();
*m = Some(agg.clone());
if false {
let diff = CaConnStatsAggDiff::diff_from(&agg_last, &agg);
info!("{}", diff.display());
}
agg_last = agg;
}
}
pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
@@ -473,13 +729,8 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
tokio::spawn(pg_conn);
let pg_client = Arc::new(pg_client);
// TODO use new struct:
// TODO use a new type:
let local_stats = Arc::new(CaConnStats::new());
// TODO factor the find loop into a separate Stream.
let qu_find_addr = pg_client
.prepare("with q1 as (select t1.facility, t1.channel, t1.addr from ioc_by_channel t1 where t1.facility = $1 and t1.channel in ($2, $3, $4, $5, $6, $7, $8, $9) and t1.addr != '' order by t1.tsmod desc) select distinct on (q1.facility, q1.channel) q1.facility, q1.channel, q1.addr from q1")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
// Fetch all addresses for all channels.
let rows = pg_client
@@ -501,7 +752,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
let data_store = Arc::new(DataStore::new(&scyconf, pg_client.clone()).await?);
let insert_item_queue = CommonInsertItemQueue::new(opts.insert_item_queue_cap);
let insert_item_queue = Arc::new(insert_item_queue);
// TODO use a new stats struct
// TODO use a new stats type:
let store_stats = Arc::new(CaConnStats::new());
let jh_insert_workers = spawn_scylla_insert_workers(
opts.scyconf.clone(),
@@ -514,20 +765,15 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
)
.await?;
let mut conn_jhs = vec![];
let conn_stats: Arc<TokMx<Vec<Arc<CaConnStats>>>> = Arc::new(TokMx::new(Vec::new()));
let command_queue_set = Arc::new(CommandQueueSet::new());
let ingest_commons = IngestCommons {
pgconf: Arc::new(pgconf.clone()),
local_epics_hostname: opts.local_epics_hostname.clone(),
insert_item_queue: insert_item_queue.clone(),
data_store: data_store.clone(),
insert_ivl_min: insert_ivl_min.clone(),
conn_stats: conn_stats.clone(),
command_queue_set: command_queue_set.clone(),
insert_frac,
extra_inserts_conf,
ca_conn_set: CaConnSet::new(),
};
let ingest_commons = Arc::new(ingest_commons);
@@ -538,159 +784,35 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
));
}
let metrics_agg_fut = {
let conn_stats = conn_stats.clone();
let local_stats = local_stats.clone();
async move {
let mut agg_last = CaConnStatsAgg::new();
loop {
tokio::time::sleep(Duration::from_millis(671)).await;
let agg = CaConnStatsAgg::new();
agg.push(&local_stats);
agg.push(&store_stats);
for g in conn_stats.lock().await.iter() {
agg.push(&g);
}
let mut m = METRICS.lock().unwrap();
*m = Some(agg.clone());
if false {
let diff = CaConnStatsAggDiff::diff_from(&agg_last, &agg);
info!("{}", diff.display());
}
agg_last = agg;
if false {
break;
}
}
}
};
let metrics_agg_fut = metrics_agg_task(ingest_commons.clone(), local_stats.clone(), store_stats.clone());
let metrics_agg_jh = tokio::spawn(metrics_agg_fut);
let mut chns_todo = &opts.channels[..];
let mut chstmp = ["__NONE__"; 8];
let mut ix = 0;
while chns_todo.len() > 0 && SIGINT.load(Ordering::Acquire) == 0 {
if false {
for (s1, s2) in chns_todo.iter().zip(chstmp.iter_mut()) {
*s2 = s1;
}
chns_todo = &chns_todo[chstmp.len().min(chns_todo.len())..];
let rows = pg_client
.query(
&qu_find_addr,
&[
&opts.backend,
&chstmp[0],
&chstmp[1],
&chstmp[2],
&chstmp[3],
&chstmp[4],
&chstmp[5],
&chstmp[6],
&chstmp[7],
],
)
.await
.map_err(|e| Error::with_msg_no_trace(format!("pg lookup error: {e:?}")))?;
for row in rows {
let ch: &str = row.get(1);
let addr: &str = row.get(2);
if addr == "" {
// TODO the address was searched before but could not be found.
} else {
let addr: SocketAddrV4 = match addr.parse() {
Ok(k) => k,
Err(e) => {
error!("can not parse {addr:?} for channel {ch:?} {e:?}");
continue;
}
};
let _ = addr;
}
}
for ch in chns_todo {
if SIGINT.load(Ordering::Acquire) != 0 {
break;
}
if let Some(ch) = chns_todo.first() {
let ch = ch.clone();
chns_todo = &chns_todo[1..];
if let Some(addr) = phonebook.get(&ch) {
if !channels_by_host.contains_key(&addr) {
channels_by_host.insert(addr, vec![ch.to_string()]);
} else {
channels_by_host.get_mut(&addr).unwrap().push(ch.to_string());
}
let create_new = {
let g = command_queue_set.queues_locked().await;
if let Some(tx) = g.get(&addr) {
let (cmd, rx) = ConnCommand::channel_add(ch.to_string());
tx.send(cmd).await.unwrap();
if !rx.recv().await.unwrap() {
error!("Could not add channel: {}", ch);
}
false
} else {
true
}
};
if create_new {
info!("create new CaConn {:?} {:?}", addr, ch);
let data_store = data_store.clone();
let conn = CaConn::new(
addr.clone(),
opts.local_epics_hostname.clone(),
data_store.clone(),
insert_item_queue.sender(),
opts.array_truncate,
opts.insert_queue_max,
insert_ivl_min.clone(),
ingest_commons.clone(),
);
conn_stats.lock().await.push(conn.stats());
let stats2 = conn.stats();
let conn_command_tx = conn.conn_command_tx();
let tx = conn_command_tx.clone();
{
command_queue_set
.queues_locked()
.await
.insert(addr.clone(), conn_command_tx);
}
let conn_block = async move {
let mut conn = conn;
while let Some(item) = conn.next().await {
match item {
Ok(_) => {
stats2.conn_item_count_inc();
}
Err(e) => {
error!("CaConn gives error: {e:?}");
break;
}
}
}
Ok::<_, Error>(())
};
let jh = tokio::spawn(conn_block);
conn_jhs.push(jh);
{
let (cmd, rx) = ConnCommand::channel_add(ch.to_string());
tx.send(cmd).await.unwrap();
if !rx.recv().await.unwrap() {
error!("Could not add channel: {}", ch);
}
}
}
}
ix += 1;
if ix % 1000 == 0 {
info!("{} of {} {}", ix, opts.channels.len(), ch);
let ch = ch.to_string();
chns_todo = &chns_todo[1..];
if let Some(addr) = phonebook.get(&ch) {
if !channels_by_host.contains_key(&addr) {
channels_by_host.insert(addr, vec![ch.to_string()]);
} else {
channels_by_host.get_mut(&addr).unwrap().push(ch.to_string());
}
ingest_commons
.ca_conn_set
.add_channel_to_addr(SocketAddr::V4(addr.clone()), ch.clone(), ingest_commons.clone())
.await?;
}
ix += 1;
if ix % 1000 == 0 {
info!("{} of {} {}", ix, opts.channels.len(), ch);
}
}
info!("channels_by_host len {}", channels_by_host.len());
let mut conn_jhs: VecDeque<Fuse<JoinHandle<Result<(), Error>>>> =
conn_jhs.into_iter().map(|jh| jh.fuse()).collect();
let mut sent_stop_commands = false;
loop {
if SIGINT.load(Ordering::Acquire) != 0 {
if false {
@@ -699,41 +821,13 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
let rc = receiver.receiver_count();
info!("item queue senders {} receivers {}", sc, rc);
}
if !sent_stop_commands {
sent_stop_commands = true;
info!("sending stop command");
let queues = command_queue_set.queues_locked().await;
for q in queues.iter() {
let (cmd, _rx) = ConnCommand::shutdown();
let _ = q.1.send(cmd).await;
}
}
}
let mut jh = if let Some(x) = conn_jhs.pop_front() {
x
} else {
info!("sending stop commands");
ingest_commons.ca_conn_set.send_stop().await?;
break;
};
futures_util::select! {
a = jh => match a {
Ok(k) => match k {
Ok(_) => {}
Err(e) => {
error!("{e:?}");
}
},
Err(e) => {
error!("{e:?}");
}
},
_b = tokio::time::sleep(Duration::from_millis(1000)).fuse() => {
conn_jhs.push_back(jh);
if sent_stop_commands {
info!("waiting for {} connections", conn_jhs.len());
}
}
};
}
tokio::time::sleep(Duration::from_millis(400)).await;
}
ingest_commons.ca_conn_set.wait_stopped().await?;
info!("all connections done.");
drop(ingest_commons);

View File

@@ -22,7 +22,7 @@ use std::collections::{BTreeMap, VecDeque};
use std::net::SocketAddrV4;
use std::ops::ControlFlow;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant, SystemTime};
@@ -350,7 +350,7 @@ pub struct CaConn {
array_truncate: usize,
stats: Arc<CaConnStats>,
insert_queue_max: usize,
insert_ivl_min: Arc<AtomicU64>,
insert_ivl_min_mus: u64,
ts_channel_alive_check_last: Instant,
conn_command_tx: async_channel::Sender<ConnCommand>,
conn_command_rx: async_channel::Receiver<ConnCommand>,
@@ -370,7 +370,6 @@ impl CaConn {
insert_item_sender: CommonInsertItemQueueSender,
array_truncate: usize,
insert_queue_max: usize,
insert_ivl_min: Arc<AtomicU64>,
ingest_commons: Arc<IngestCommons>,
) -> Self {
let (cq_tx, cq_rx) = async_channel::bounded(32);
@@ -395,7 +394,7 @@ impl CaConn {
array_truncate,
stats: Arc::new(CaConnStats::new()),
insert_queue_max,
insert_ivl_min,
insert_ivl_min_mus: 1000 * 1000 * 6,
ts_channel_alive_check_last: Instant::now(),
conn_command_tx: cq_tx,
conn_command_rx: cq_rx,
@@ -529,8 +528,12 @@ impl CaConn {
}
pub fn channel_add(&mut self, channel: String) {
if self.cid_by_name.contains_key(&channel) {
return;
}
let cid = self.cid_by_name(&channel);
if self.channels.contains_key(&cid) {
error!("logic error");
} else {
self.channels.insert(cid, ChannelState::Init);
// TODO do not count, use separate queue for those channels.
@@ -827,7 +830,7 @@ impl CaConn {
ev: proto::EventAddRes,
tsnow: Instant,
item_queue: &mut VecDeque<QueryItem>,
insert_ivl_min: Arc<AtomicU64>,
insert_ivl_min_mus: u64,
stats: Arc<CaConnStats>,
inserts_counter: &mut u64,
extra_inserts_conf: &ExtraInsertsConf,
@@ -836,8 +839,7 @@ impl CaConn {
st.insert_item_ivl_ema.tick(tsnow);
let em = st.insert_item_ivl_ema.ema();
let ema = em.ema();
let ivl_min = insert_ivl_min.load(Ordering::Acquire);
let ivl_min = (ivl_min as f32) * 1e-6;
let ivl_min = (insert_ivl_min_mus as f32) * 1e-6;
let dt = (ivl_min - ema).max(0.) / em.k();
st.insert_next_earliest = tsnow
.checked_add(Duration::from_micros((dt * 1e6) as u64))
@@ -852,14 +854,14 @@ impl CaConn {
} else {
None
};
for &(m, l) in extra_inserts_conf.copies.iter() {
for (i, &(m, l)) in extra_inserts_conf.copies.iter().enumerate().rev() {
if *inserts_counter % m == l {
Self::event_add_insert(
st,
series.clone(),
scalar_type.clone(),
shape.clone(),
ts - 2,
ts - 1 - i as u64,
ev.clone(),
item_queue,
ts_msp_last,
@@ -953,7 +955,7 @@ impl CaConn {
ev,
tsnow,
item_queue,
self.insert_ivl_min.clone(),
self.insert_ivl_min_mus,
self.stats.clone(),
inserts_counter,
extra_inserts_conf,

View File

@@ -1,3 +1,4 @@
use async_channel::{RecvError, SendError};
use err::Error;
use scylla::transport::errors::QueryError;
use scylla::transport::query_result::{FirstRowError, RowsExpectedError};
@@ -6,6 +7,24 @@ pub trait ErrConv<T> {
fn err_conv(self) -> Result<T, Error>;
}
impl<T, H> ErrConv<T> for Result<T, SendError<H>> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, RecvError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, QueryError> {
fn err_conv(self) -> Result<T, Error> {
match self {

View File

@@ -1,10 +1,11 @@
use crate::ca::conn::ConnCommand;
use crate::ca::{ExtraInsertsConf, IngestCommons};
use axum::extract::Query;
use err::Error;
use http::request::Parts;
use log::*;
use std::collections::HashMap;
use std::net::SocketAddrV4;
use std::net::{SocketAddr, SocketAddrV4};
use std::sync::atomic::Ordering;
use std::sync::Arc;
@@ -12,6 +13,7 @@ async fn get_empty() -> String {
String::new()
}
#[allow(unused)]
async fn send_command<'a, IT, F, R>(it: &mut IT, cmdgen: F) -> Vec<async_channel::Receiver<R>>
where
IT: Iterator<Item = (&'a SocketAddrV4, &'a async_channel::Sender<ConnCommand>)>,
@@ -37,86 +39,44 @@ async fn find_channel(
ingest_commons: Arc<IngestCommons>,
) -> axum::Json<Vec<(String, Vec<String>)>> {
let pattern = params.get("pattern").map_or(String::new(), |x| x.clone()).to_string();
let g = ingest_commons.command_queue_set.queues_locked().await;
let mut it = g.iter();
let rxs = send_command(&mut it, || ConnCommand::find_channel(pattern.clone())).await;
let mut res = Vec::new();
for rx in rxs {
let item = rx.recv().await.unwrap();
if item.1.len() > 0 {
let item = (item.0.to_string(), item.1);
res.push(item);
}
}
// TODO allow usage of `?` in handler:
let res = ingest_commons
.ca_conn_set
.send_command_to_all(|| ConnCommand::find_channel(pattern.clone()))
.await
.unwrap();
let res = res.into_iter().map(|x| (x.0.to_string(), x.1)).collect();
axum::Json(res)
}
async fn channel_add(params: HashMap<String, String>, ingest_commons: Arc<IngestCommons>) -> String {
async fn channel_add_inner(params: HashMap<String, String>, ingest_commons: Arc<IngestCommons>) -> Result<(), Error> {
if let (Some(backend), Some(name)) = (params.get("backend"), params.get("name")) {
// TODO look up the address.
match crate::ca::find_channel_addr(backend.into(), name.into(), &ingest_commons.pgconf).await {
Ok(Some(addr)) => {
if ingest_commons
.command_queue_set
.queues_locked()
.await
.contains_key(&addr)
{
} else {
match crate::ca::create_ca_conn(
addr,
ingest_commons.local_epics_hostname.clone(),
256,
32,
ingest_commons.insert_item_queue.clone(),
ingest_commons.data_store.clone(),
ingest_commons.insert_ivl_min.clone(),
ingest_commons.conn_stats.clone(),
ingest_commons.command_queue_set.clone(),
ingest_commons.clone(),
)
.await
{
Ok(_) => {
// TODO keep the join handle.
}
Err(_) => {
error!("can not create CaConn");
}
}
}
if let Some(tx) = ingest_commons.command_queue_set.queues_locked().await.get(&addr) {
let (cmd, rx) = ConnCommand::channel_add(name.into());
if let Err(_) = tx.send(cmd).await {
error!("can not send command");
"false".into()
} else {
match rx.recv().await {
Ok(x) => {
if x {
"true".into()
} else {
"false".into()
}
}
Err(_) => "false".into(),
}
}
} else {
error!("Even after create, can not locate the connection.");
"false".into()
}
ingest_commons
.ca_conn_set
.add_channel_to_addr(SocketAddr::V4(addr), name.into(), ingest_commons.clone())
.await?;
Ok(())
}
_ => {
error!("can not find addr for channel");
"false".into()
Err(Error::with_msg_no_trace(format!("can not find addr for channel")))
}
}
} else {
"false".into()
Err(Error::with_msg_no_trace(format!("wrong parameters given")))
}
}
async fn channel_add(params: HashMap<String, String>, ingest_commons: Arc<IngestCommons>) -> axum::Json<bool> {
let ret = match channel_add_inner(params, ingest_commons).await {
Ok(_) => true,
Err(_) => false,
};
axum::Json(ret)
}
async fn channel_remove(
params: HashMap<String, String>,
ingest_commons: Arc<IngestCommons>,
@@ -132,7 +92,7 @@ async fn channel_remove(
} else {
return Json(Value::Bool(false));
};
let backend = if let Some(x) = params.get("backend") {
let _backend = if let Some(x) = params.get("backend") {
x
} else {
return Json(Value::Bool(false));
@@ -142,87 +102,70 @@ async fn channel_remove(
} else {
return Json(Value::Bool(false));
};
if let Some(tx) = ingest_commons.command_queue_set.queues_locked().await.get(&addr) {
// TODO any need to check the backend here?
let _ = backend;
let (cmd, rx) = ConnCommand::channel_remove(name.into());
if let Err(_) = tx.send(cmd).await {
error!("can not send command");
match ingest_commons
.ca_conn_set
.send_command_to_addr(&SocketAddr::V4(addr), || ConnCommand::channel_remove(name.into()))
.await
{
Ok(k) => Json(Value::Bool(k)),
Err(e) => {
error!("{e:?}");
Json(Value::Bool(false))
} else {
match rx.recv().await {
Ok(x) => Json(Value::Bool(x)),
Err(_) => Json(Value::Bool(false)),
}
}
} else {
Json(Value::Bool(false))
}
}
async fn channel_state(params: HashMap<String, String>, ingest_commons: Arc<IngestCommons>) -> String {
let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string();
let g = ingest_commons.command_queue_set.queues_locked().await;
let mut rxs = Vec::new();
for (_, tx) in g.iter() {
let (cmd, rx) = ConnCommand::channel_state(name.clone());
match tx.send(cmd).await {
Ok(()) => {
rxs.push(rx);
}
Err(e) => {
error!("can not send command {e:?}");
}
match ingest_commons
.ca_conn_set
.send_command_to_all(|| ConnCommand::channel_state(name.clone()))
.await
{
Ok(k) => {
let a: Vec<_> = k.into_iter().map(|(a, b)| (a.to_string(), b)).collect();
serde_json::to_string(&a).unwrap()
}
Err(e) => {
error!("{e:?}");
return format!("null");
}
}
let mut res = Vec::new();
for rx in rxs {
let item = rx.recv().await.unwrap();
if let Some(st) = item.1 {
let item = (item.0.to_string(), st);
res.push(item);
}
}
serde_json::to_string(&res).unwrap()
}
async fn channel_states(
_params: HashMap<String, String>,
ingest_commons: Arc<IngestCommons>,
) -> axum::Json<Vec<crate::ca::conn::ChannelStateInfo>> {
let g = ingest_commons.command_queue_set.queues_locked().await;
let mut rxs = Vec::new();
for (_, tx) in g.iter() {
let (cmd, rx) = ConnCommand::channel_states_all();
match tx.send(cmd).await {
Ok(()) => {
rxs.push(rx);
}
Err(e) => {
error!("can not send command {e:?}");
}
}
}
let vals = ingest_commons
.ca_conn_set
.send_command_to_all(|| ConnCommand::channel_states_all())
.await
.unwrap();
let mut res = Vec::new();
for rx in rxs {
let item = rx.recv().await.unwrap();
for h in item.1 {
res.push(h);
for h in vals {
for j in h.1 {
res.push(j);
}
}
res.sort_unstable_by_key(|v| u32::MAX - v.interest_score as u32);
//let res: Vec<_> = res.into_iter().rev().take(10).collect();
let res = if true {
res.into_iter().rev().take(10).collect()
} else {
res
};
axum::Json(res)
}
async fn extra_inserts_conf_set(v: ExtraInsertsConf, ingest_commons: Arc<IngestCommons>) -> axum::Json<bool> {
// TODO ingest_commons is the authorative value. Should have common function outside of this metrics which
// can update everything to a given value.
*ingest_commons.extra_inserts_conf.lock().unwrap() = v.clone();
let g = ingest_commons.command_queue_set.queues_locked().await;
let mut it = g.iter();
let rxs = send_command(&mut it, || ConnCommand::extra_inserts_conf_set(v.clone())).await;
for rx in rxs {
let _item = rx.recv().await.unwrap();
}
ingest_commons
.ca_conn_set
.send_command_to_all(|| ConnCommand::extra_inserts_conf_set(v.clone()))
.await
.unwrap();
axum::Json(true)
}