Lookup series ids via batched query
This commit is contained in:
+35
-63
@@ -298,7 +298,7 @@ pub struct Daemon {
|
||||
impl Daemon {
|
||||
pub async fn new(opts: DaemonOpts) -> Result<Self, Error> {
|
||||
let pg_client = Arc::new(make_pg_client(&opts.pgconf).await?);
|
||||
let datastore = DataStore::new(&opts.scyconf, pg_client.clone()).await?;
|
||||
let datastore = DataStore::new(&opts.scyconf).await?;
|
||||
let datastore = Arc::new(datastore);
|
||||
let (tx, rx) = async_channel::bounded(32);
|
||||
let pgcs = {
|
||||
@@ -310,6 +310,9 @@ impl Daemon {
|
||||
a
|
||||
};
|
||||
let (search_tx, ioc_finder_jh) = Self::start_finder(tx.clone(), opts.backend().into(), pgcs);
|
||||
|
||||
let channel_info_query_tx = netfetch::batchquery::series_by_channel::start_task(&opts.pgconf).await?;
|
||||
|
||||
let common_insert_item_queue = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap));
|
||||
let common_insert_item_queue_2 = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap));
|
||||
let insert_queue_counter = Arc::new(AtomicUsize::new(0));
|
||||
@@ -388,7 +391,7 @@ impl Daemon {
|
||||
extra_inserts_conf: tokio::sync::Mutex::new(ExtraInsertsConf::new()),
|
||||
store_workers_rate: AtomicU64::new(20000),
|
||||
insert_frac: AtomicU64::new(1000),
|
||||
ca_conn_set: CaConnSet::new(),
|
||||
ca_conn_set: CaConnSet::new(channel_info_query_tx),
|
||||
};
|
||||
let ingest_commons = Arc::new(ingest_commons);
|
||||
|
||||
@@ -491,57 +494,20 @@ impl Daemon {
|
||||
}
|
||||
let (qtx, qrx) = async_channel::bounded(CURRENT_SEARCH_PENDING_MAX);
|
||||
let fut = async move {
|
||||
let (batch_tx, batch_rx) = async_channel::bounded(SEARCH_DB_PIPELINE_LEN);
|
||||
let fut2 = async move {
|
||||
let mut batch_ix = 0 as usize;
|
||||
let mut all = Vec::new();
|
||||
let mut do_emit = false;
|
||||
loop {
|
||||
if do_emit {
|
||||
do_emit = false;
|
||||
let batch = std::mem::replace(&mut all, Vec::new());
|
||||
let n = batch.len();
|
||||
trace_batch!("--- BATCH TRY SEND");
|
||||
match batch_tx.send((batch_ix, batch)).await {
|
||||
Ok(_) => {
|
||||
trace_batch!("--- BATCH SEND DONE");
|
||||
batch_ix += 1;
|
||||
SEARCH_REQ_BATCH_SEND_COUNT.fetch_add(n, atomic::Ordering::AcqRel);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("can not send batch");
|
||||
all = e.0 .1;
|
||||
}
|
||||
}
|
||||
}
|
||||
match tokio::time::timeout(Duration::from_millis(200), qrx.recv()).await {
|
||||
Ok(k) => match k {
|
||||
Ok(item) => {
|
||||
SEARCH_REQ_RECV_COUNT.fetch_add(1, atomic::Ordering::AcqRel);
|
||||
all.push(item);
|
||||
if all.len() >= SEARCH_BATCH_MAX {
|
||||
do_emit = true;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("error in batcher, no more input {e}");
|
||||
break;
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
let _e: tokio::time::error::Elapsed = e;
|
||||
if all.len() > 0 {
|
||||
do_emit = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
warn!("-------- batcher is done --------------");
|
||||
};
|
||||
tokio::spawn(fut2);
|
||||
let (batch_rx, _jh) = netfetch::batcher::batch(
|
||||
SEARCH_BATCH_MAX,
|
||||
Duration::from_millis(200),
|
||||
SEARCH_DB_PIPELINE_LEN,
|
||||
qrx,
|
||||
);
|
||||
let (pgc_tx, pgc_rx) = async_channel::bounded(128);
|
||||
for pgc in pgcs {
|
||||
let sql = "with q1 as (select * from unnest($2::text[]) as unn (ch)) select distinct on (tt.facility, tt.channel) tt.channel, tt.addr from ioc_by_channel_log tt join q1 on tt.channel = q1.ch and tt.facility = $1 and tt.addr is not null order by tt.facility, tt.channel, tsmod desc";
|
||||
let sql = concat!(
|
||||
"with q1 as (select * from unnest($2::text[]) as unn (ch))",
|
||||
" select distinct on (tt.facility, tt.channel) tt.channel, tt.addr",
|
||||
" from ioc_by_channel_log tt join q1 on tt.channel = q1.ch and tt.facility = $1 and tt.addr is not null",
|
||||
" order by tt.facility, tt.channel, tsmod desc",
|
||||
);
|
||||
let qu_select_multi = pgc.prepare(sql).await.unwrap();
|
||||
let qu_select_multi = Arc::new(qu_select_multi);
|
||||
match pgc_tx.send((pgc, qu_select_multi)).await {
|
||||
@@ -553,7 +519,7 @@ impl Daemon {
|
||||
}
|
||||
let backend = Arc::new(backend.clone());
|
||||
let stream = batch_rx
|
||||
.map(|(batch_ix, batch): (usize, Vec<String>)| {
|
||||
.map(|batch: Vec<String>| {
|
||||
let pgc_tx = pgc_tx.clone();
|
||||
let pgc_rx = pgc_rx.clone();
|
||||
let backend = backend.clone();
|
||||
@@ -561,12 +527,11 @@ impl Daemon {
|
||||
async move {
|
||||
let ts1 = Instant::now();
|
||||
let (pgc, qu_select_multi) = pgc_rx.recv().await.unwrap();
|
||||
debug_batch!("run query batch {} len {}", batch_ix, batch.len());
|
||||
debug_batch!("run query batch len {}", batch.len());
|
||||
let qres = pgc.query(qu_select_multi.as_ref(), &[backend.as_ref(), &batch]).await;
|
||||
let dt = ts1.elapsed();
|
||||
debug_batch!(
|
||||
"done query batch {} len {}: {} {:.3}ms",
|
||||
batch_ix,
|
||||
"done query batch len {}: {} {:.3}ms",
|
||||
batch.len(),
|
||||
qres.is_ok(),
|
||||
dt.as_secs_f32() * 1e3
|
||||
@@ -582,16 +547,16 @@ impl Daemon {
|
||||
out.push('\'');
|
||||
}
|
||||
out.push(']');
|
||||
eprintln!("VERY LONG QUERY batch_ix {batch_ix}\n{out}");
|
||||
eprintln!("VERY SLOW QUERY\n{out}");
|
||||
}
|
||||
pgc_tx.send((pgc, qu_select_multi)).await.unwrap();
|
||||
(batch_ix, batch, qres)
|
||||
(batch, qres)
|
||||
}
|
||||
})
|
||||
.buffer_unordered(SEARCH_DB_PIPELINE_LEN);
|
||||
let mut resdiff = 0;
|
||||
let mut stream = Box::pin(stream);
|
||||
while let Some((batch_ix, batch, pgres)) = stream.next().await {
|
||||
while let Some((batch, pgres)) = stream.next().await {
|
||||
match pgres {
|
||||
Ok(rows) => {
|
||||
if rows.len() > batch.len() {
|
||||
@@ -624,12 +589,9 @@ impl Daemon {
|
||||
error!("STILL NOT MATCHING LEN");
|
||||
}
|
||||
SEARCH_RES_3_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel);
|
||||
debug_batch!("TRY SEND batch_ix {batch_ix}");
|
||||
let x = tx.send(DaemonEvent::SearchDone(Ok(items))).await;
|
||||
match x {
|
||||
Ok(_) => {
|
||||
debug_batch!("DONE SEND batch_ix {batch_ix}");
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("finder sees: {e}");
|
||||
break;
|
||||
@@ -1088,7 +1050,8 @@ impl Daemon {
|
||||
ActiveChannelState::SearchPending { since: _, did_send: _ } => {}
|
||||
ActiveChannelState::WithAddress { addr, state: _ } => {
|
||||
if addr == &conn_addr {
|
||||
info!("ca conn down, reset {k:?}");
|
||||
// TODO reset channel, emit log event for the connection addr only
|
||||
//info!("ca conn down, reset {k:?}");
|
||||
*v = ChannelState {
|
||||
value: ChannelStateValue::Active(ActiveChannelState::UnknownAddress {
|
||||
since: SystemTime::now(),
|
||||
@@ -1222,6 +1185,15 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
|
||||
info!("start up {opts:?}");
|
||||
netfetch::linuxhelper::set_signal_handler(libc::SIGINT, handler_sigint)?;
|
||||
netfetch::linuxhelper::set_signal_handler(libc::SIGTERM, handler_sigterm)?;
|
||||
|
||||
let dcom = Arc::new(netfetch::metrics::DaemonComm::dummy());
|
||||
netfetch::metrics::start_metrics_service(opts.api_bind(), dcom);
|
||||
|
||||
// TODO use a new stats type:
|
||||
let store_stats = Arc::new(CaConnStats::new());
|
||||
let metrics_agg_fut = metrics_agg_task(ingest_commons.clone(), local_stats.clone(), store_stats.clone());
|
||||
let metrics_agg_jh = tokio::spawn(metrics_agg_fut);
|
||||
|
||||
let opts2 = DaemonOpts {
|
||||
backend: opts.backend().into(),
|
||||
local_epics_hostname: opts.local_epics_hostname().into(),
|
||||
|
||||
@@ -0,0 +1,54 @@
|
||||
use async_channel::Receiver;
|
||||
use netpod::log::*;
|
||||
use std::time::Duration;
|
||||
|
||||
pub fn batch<T>(
|
||||
batch_limit: usize,
|
||||
timeout: Duration,
|
||||
outcap: usize,
|
||||
rx: Receiver<T>,
|
||||
) -> (Receiver<Vec<T>>, tokio::task::JoinHandle<()>)
|
||||
where
|
||||
T: Send + 'static,
|
||||
{
|
||||
let (batch_tx, batch_rx) = async_channel::bounded(outcap);
|
||||
let fut2 = async move {
|
||||
let mut all = Vec::new();
|
||||
let mut do_emit = false;
|
||||
loop {
|
||||
if do_emit {
|
||||
do_emit = false;
|
||||
let batch = std::mem::replace(&mut all, Vec::new());
|
||||
match batch_tx.send(batch).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("can not send batch");
|
||||
all = e.0;
|
||||
}
|
||||
}
|
||||
}
|
||||
match tokio::time::timeout(timeout, rx.recv()).await {
|
||||
Ok(k) => match k {
|
||||
Ok(item) => {
|
||||
all.push(item);
|
||||
if all.len() >= batch_limit {
|
||||
do_emit = true;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("error in batcher, no more input {e}");
|
||||
break;
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
let _e: tokio::time::error::Elapsed = e;
|
||||
if all.len() > 0 {
|
||||
do_emit = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
warn!("-------- batcher is done --------------");
|
||||
};
|
||||
(batch_rx, tokio::spawn(fut2))
|
||||
}
|
||||
@@ -0,0 +1 @@
|
||||
pub mod series_by_channel;
|
||||
@@ -0,0 +1,190 @@
|
||||
use crate::batcher;
|
||||
use crate::dbpg::make_pg_client;
|
||||
use crate::series::Existence;
|
||||
use crate::series::SeriesId;
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
use err::Error;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::log::*;
|
||||
use netpod::Database;
|
||||
use std::time::Duration;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_postgres::Client as PgClient;
|
||||
use tokio_postgres::Statement as PgStatement;
|
||||
|
||||
pub struct ChannelInfoQuery {
|
||||
pub backend: String,
|
||||
pub channel: String,
|
||||
pub scalar_type: i32,
|
||||
pub shape_dims: Vec<i32>,
|
||||
pub tx: Sender<Result<Existence<SeriesId>, Error>>,
|
||||
}
|
||||
|
||||
impl ChannelInfoQuery {
|
||||
pub fn dummy(&self) -> Self {
|
||||
Self {
|
||||
backend: String::new(),
|
||||
channel: String::new(),
|
||||
scalar_type: 4242,
|
||||
shape_dims: Vec::new(),
|
||||
tx: self.tx.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct ChannelInfoResult {
|
||||
series: Vec<Existence<SeriesId>>,
|
||||
tx: Vec<Sender<Result<Existence<SeriesId>, Error>>>,
|
||||
}
|
||||
|
||||
struct PgRes {
|
||||
pgc: PgClient,
|
||||
st: PgStatement,
|
||||
}
|
||||
|
||||
async fn prepare_pgcs(sql: &str, pgcn: usize, db: &Database) -> Result<(Sender<PgRes>, Receiver<PgRes>), Error> {
|
||||
let (pgc_tx, pgc_rx) = async_channel::bounded(pgcn);
|
||||
for _ in 0..pgcn {
|
||||
let pgc = make_pg_client(&db).await?;
|
||||
let st = pgc.prepare(sql).await.map_err(|e| Error::from(e.to_string()))?;
|
||||
let k = PgRes { pgc, st };
|
||||
match pgc_tx.send(k).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("can not enqueue pgc {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok((pgc_tx, pgc_rx))
|
||||
}
|
||||
|
||||
async fn fetch_data(pgres: PgRes) -> Result<ChannelInfoResult, Error> {
|
||||
err::todoval()
|
||||
}
|
||||
|
||||
async fn run_queries(
|
||||
npg: usize,
|
||||
batch_rx: Receiver<Vec<ChannelInfoQuery>>,
|
||||
pgc_rx: Receiver<PgRes>,
|
||||
pgc_tx: Sender<PgRes>,
|
||||
) -> Result<(), Error> {
|
||||
let mut stream = batch_rx
|
||||
.map(|batch| {
|
||||
let pgc_rx = pgc_rx.clone();
|
||||
let pgc_tx = pgc_tx.clone();
|
||||
async move {
|
||||
if let Ok(pgres) = pgc_rx.recv().await {
|
||||
let mut backend = Vec::new();
|
||||
let mut channel = Vec::new();
|
||||
let mut scalar_type = Vec::new();
|
||||
let mut shape_dims: Vec<String> = Vec::new();
|
||||
let mut rid = Vec::new();
|
||||
let mut tx = Vec::new();
|
||||
for (i, e) in batch.into_iter().enumerate() {
|
||||
backend.push(e.backend);
|
||||
channel.push(e.channel);
|
||||
scalar_type.push(e.scalar_type);
|
||||
let mut dims = String::with_capacity(16);
|
||||
dims.push('{');
|
||||
for (i, v) in e.shape_dims.into_iter().enumerate() {
|
||||
if i > 0 {
|
||||
dims.push(',');
|
||||
}
|
||||
use std::fmt::Write;
|
||||
write!(dims, "{}", v).unwrap();
|
||||
}
|
||||
dims.push('}');
|
||||
shape_dims.push(dims);
|
||||
rid.push(i as i32);
|
||||
tx.push((i as u32, e.tx));
|
||||
}
|
||||
match pgres
|
||||
.pgc
|
||||
.query(&pgres.st, &[&backend, &channel, &scalar_type, &shape_dims, &rid])
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!("{e}");
|
||||
Error::from(e.to_string())
|
||||
}) {
|
||||
Ok(rows) => {
|
||||
if pgc_tx.send(pgres).await.is_err() {
|
||||
Err(Error::with_msg_no_trace("can not hand pgres back"))
|
||||
} else {
|
||||
let mut series_ids = Vec::new();
|
||||
let mut txs = Vec::new();
|
||||
let mut it1 = rows.into_iter();
|
||||
let mut e1 = it1.next();
|
||||
for (qrid, tx) in tx {
|
||||
if let Some(row) = &e1 {
|
||||
let rid: i32 = row.get(1);
|
||||
if rid as u32 == qrid {
|
||||
let series: i64 = row.get(0);
|
||||
let series = SeriesId::new(series as _);
|
||||
series_ids.push(Existence::Existing(series));
|
||||
txs.push(tx);
|
||||
}
|
||||
e1 = it1.next();
|
||||
}
|
||||
}
|
||||
let result = ChannelInfoResult {
|
||||
series: series_ids,
|
||||
tx: txs,
|
||||
};
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("error in pg query {e}");
|
||||
tokio::time::sleep(Duration::from_millis(2000)).await;
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
error!("can not get pgc");
|
||||
Err(Error::with_msg_no_trace("no more pgres"))
|
||||
}
|
||||
}
|
||||
})
|
||||
.buffer_unordered(npg);
|
||||
while let Some(item) = stream.next().await {
|
||||
match item {
|
||||
Ok(res) => {
|
||||
for (sid, tx) in res.series.into_iter().zip(res.tx) {
|
||||
match tx.send(Ok(sid)).await {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
// TODO count cases, but no log. Client may no longer be interested in this result.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("{e}");
|
||||
tokio::time::sleep(Duration::from_millis(1000)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn start_task(db: &Database) -> Result<Sender<ChannelInfoQuery>, Error> {
|
||||
let sql = concat!(
|
||||
"with q1 as (select * from unnest($1::text[], $2::text[], $3::int[], $4::text[], $5::int[])",
|
||||
" as inp (backend, channel, scalar_type, shape_dims, rid))",
|
||||
" select t.series, q1.rid from series_by_channel t",
|
||||
" join q1 on t.facility = q1.backend and t.channel = q1.channel",
|
||||
" and t.scalar_type = q1.scalar_type and t.shape_dims = q1.shape_dims::int[]",
|
||||
" and t.agg_kind = 0",
|
||||
" order by q1.rid",
|
||||
);
|
||||
let inp_cap = 128;
|
||||
let batch_out_cap = 4;
|
||||
let pgcn = 4;
|
||||
let timeout = Duration::from_millis(200);
|
||||
let (pgc_tx, pgc_rx) = prepare_pgcs(sql, pgcn, db).await?;
|
||||
let (query_tx, query_rx) = async_channel::bounded(inp_cap);
|
||||
let (batch_rx, _batch_jh) = batcher::batch(inp_cap, timeout, batch_out_cap, query_rx);
|
||||
let _queries_jh: JoinHandle<_> = tokio::task::spawn(run_queries(pgcn, batch_rx, pgc_rx, pgc_tx));
|
||||
Ok(query_tx)
|
||||
}
|
||||
+1
-204
@@ -7,22 +7,16 @@ pub mod store;
|
||||
|
||||
use self::store::DataStore;
|
||||
use crate::ca::connset::CaConnSet;
|
||||
use crate::conf::CaIngestOpts;
|
||||
use crate::errconv::ErrConv;
|
||||
use crate::insertworker::spawn_scylla_insert_workers;
|
||||
use crate::metrics::metrics_agg_task;
|
||||
use crate::metrics::ExtraInsertsConf;
|
||||
use crate::rt::TokMx;
|
||||
use crate::store::CommonInsertItemQueue;
|
||||
use err::Error;
|
||||
use futures_util::stream::FuturesUnordered;
|
||||
use futures_util::Future;
|
||||
use futures_util::{FutureExt, StreamExt};
|
||||
use futures_util::FutureExt;
|
||||
use log::*;
|
||||
use netpod::Database;
|
||||
use stats::CaConnStats;
|
||||
use stats::CaConnStatsAgg;
|
||||
use std::collections::BTreeMap;
|
||||
use std::net::SocketAddrV4;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
|
||||
@@ -233,200 +227,3 @@ fn handler_sigaction(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const lib
|
||||
crate::ca::SIGINT.store(1, Ordering::Release);
|
||||
let _ = crate::linuxhelper::unset_signal_handler(libc::SIGINT);
|
||||
}
|
||||
|
||||
pub async fn ca_connect(opts: CaIngestOpts, channels: &Vec<String>) -> Result<(), Error> {
|
||||
crate::linuxhelper::set_signal_handler(libc::SIGINT, handler_sigaction)?;
|
||||
let extra_inserts_conf = TokMx::new(ExtraInsertsConf { copies: Vec::new() });
|
||||
let insert_ivl_min = Arc::new(AtomicU64::new(8800));
|
||||
let scyconf = opts.scylla().clone();
|
||||
let pgconf = opts.postgresql().clone();
|
||||
let d = &pgconf;
|
||||
let (pg_client, pg_conn) = tokio_postgres::connect(
|
||||
&format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name),
|
||||
tokio_postgres::tls::NoTls,
|
||||
)
|
||||
.await
|
||||
.err_conv()?;
|
||||
// TODO allow clean shutdown on ctrl-c and join the pg_conn in the end:
|
||||
tokio::spawn(pg_conn);
|
||||
let pg_client = Arc::new(pg_client);
|
||||
|
||||
// TODO use a new type:
|
||||
let local_stats = Arc::new(CaConnStats::new());
|
||||
|
||||
info!("fetch phonebook begin");
|
||||
// Fetch all addresses for all channels.
|
||||
let rows = pg_client
|
||||
.query(
|
||||
"select distinct on (facility, channel) channel, addr from ioc_by_channel_log where channel is not null and addr is not null order by facility, channel, tsmod desc",
|
||||
&[],
|
||||
)
|
||||
.await
|
||||
.err_conv()?;
|
||||
let mut phonebook = BTreeMap::new();
|
||||
for row in rows {
|
||||
let channel: String = row.get(0);
|
||||
let addr: String = row.get(1);
|
||||
let addr: SocketAddrV4 = addr
|
||||
.parse()
|
||||
.map_err(|_| Error::with_msg_no_trace(format!("can not parse address {addr}")))?;
|
||||
phonebook.insert(channel, addr);
|
||||
}
|
||||
info!("fetch phonebook done");
|
||||
|
||||
let mut channels_by_host = BTreeMap::new();
|
||||
|
||||
let data_store = Arc::new(DataStore::new(&scyconf, pg_client.clone()).await?);
|
||||
let insert_item_queue = CommonInsertItemQueue::new(opts.insert_item_queue_cap());
|
||||
let insert_item_queue = Arc::new(insert_item_queue);
|
||||
|
||||
let ingest_commons = IngestCommons {
|
||||
pgconf: Arc::new(pgconf.clone()),
|
||||
backend: opts.backend().into(),
|
||||
local_epics_hostname: opts.local_epics_hostname().clone(),
|
||||
insert_item_queue: insert_item_queue.clone(),
|
||||
data_store: data_store.clone(),
|
||||
insert_ivl_min: insert_ivl_min.clone(),
|
||||
extra_inserts_conf,
|
||||
store_workers_rate: AtomicU64::new(opts.store_workers_rate()),
|
||||
insert_frac: AtomicU64::new(opts.insert_frac()),
|
||||
ca_conn_set: CaConnSet::new(),
|
||||
};
|
||||
let ingest_commons = Arc::new(ingest_commons);
|
||||
|
||||
tokio::spawn({
|
||||
let rx = ingest_commons.ca_conn_set.conn_item_rx();
|
||||
async move { while let Ok(_item) = rx.recv().await {} }
|
||||
});
|
||||
|
||||
// TODO use a new stats type:
|
||||
let store_stats = Arc::new(CaConnStats::new());
|
||||
let ttls = crate::insertworker::Ttls {
|
||||
index: opts.ttl_index(),
|
||||
d0: opts.ttl_d0(),
|
||||
d1: opts.ttl_d1(),
|
||||
};
|
||||
let jh_insert_workers = spawn_scylla_insert_workers(
|
||||
opts.scylla().clone(),
|
||||
opts.insert_scylla_sessions(),
|
||||
opts.insert_worker_count(),
|
||||
insert_item_queue.clone(),
|
||||
ingest_commons.clone(),
|
||||
pg_client.clone(),
|
||||
store_stats.clone(),
|
||||
opts.use_rate_limit_queue(),
|
||||
ttls,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if true {
|
||||
tokio::spawn(crate::metrics::start_metrics_service(
|
||||
opts.api_bind().clone(),
|
||||
ingest_commons.clone(),
|
||||
));
|
||||
}
|
||||
|
||||
let metrics_agg_fut = metrics_agg_task(ingest_commons.clone(), local_stats.clone(), store_stats.clone());
|
||||
let metrics_agg_jh = tokio::spawn(metrics_agg_fut);
|
||||
|
||||
let mut chns_todo = &channels[..];
|
||||
let mut ix = 0;
|
||||
for ch in chns_todo {
|
||||
if SIGINT.load(Ordering::Acquire) != 0 {
|
||||
break;
|
||||
}
|
||||
let ch = ch.to_string();
|
||||
chns_todo = &chns_todo[1..];
|
||||
if let Some(addr) = phonebook.get(&ch) {
|
||||
if !channels_by_host.contains_key(&addr) {
|
||||
channels_by_host.insert(addr, vec![ch.to_string()]);
|
||||
} else {
|
||||
channels_by_host.get_mut(&addr).unwrap().push(ch.to_string());
|
||||
}
|
||||
ingest_commons
|
||||
.ca_conn_set
|
||||
.add_channel_to_addr(
|
||||
opts.backend().into(),
|
||||
*addr,
|
||||
ch.clone(),
|
||||
&ingest_commons.insert_item_queue,
|
||||
&ingest_commons.data_store,
|
||||
opts.insert_queue_max(),
|
||||
opts.array_truncate(),
|
||||
opts.local_epics_hostname(),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
ix += 1;
|
||||
if ix % 1000 == 0 {
|
||||
info!("{} of {} {}", ix, channels.len(), ch);
|
||||
}
|
||||
}
|
||||
info!("channels_by_host len {}", channels_by_host.len());
|
||||
|
||||
loop {
|
||||
if SIGINT.load(Ordering::Acquire) != 0 {
|
||||
if false {
|
||||
let receiver = insert_item_queue.receiver();
|
||||
let sc = receiver.sender_count();
|
||||
let rc = receiver.receiver_count();
|
||||
info!("item queue senders {} receivers {}", sc, rc);
|
||||
}
|
||||
error!("TODO sending stop commands");
|
||||
//ingest_commons.ca_conn_set.send_stop().await?;
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(400)).await;
|
||||
}
|
||||
ingest_commons.ca_conn_set.wait_stopped().await?;
|
||||
info!("all connections done.");
|
||||
|
||||
insert_item_queue.close();
|
||||
|
||||
drop(ingest_commons);
|
||||
metrics_agg_jh.abort();
|
||||
drop(metrics_agg_jh);
|
||||
|
||||
if false {
|
||||
let sender = insert_item_queue.sender_raw();
|
||||
sender.close();
|
||||
let receiver = insert_item_queue.receiver();
|
||||
receiver.close();
|
||||
}
|
||||
if true {
|
||||
let receiver = insert_item_queue.receiver();
|
||||
let sc = receiver.sender_count();
|
||||
let rc = receiver.receiver_count();
|
||||
info!("item queue A senders {} receivers {}", sc, rc);
|
||||
}
|
||||
let receiver = insert_item_queue.receiver();
|
||||
drop(insert_item_queue);
|
||||
if true {
|
||||
let sc = receiver.sender_count();
|
||||
let rc = receiver.receiver_count();
|
||||
info!("item queue B senders {} receivers {}", sc, rc);
|
||||
}
|
||||
receiver.close();
|
||||
|
||||
let mut futs = FuturesUnordered::from_iter(jh_insert_workers);
|
||||
loop {
|
||||
futures_util::select!(
|
||||
x = futs.next() => match x {
|
||||
Some(Ok(_)) => {}
|
||||
Some(Err(e)) => {
|
||||
error!("error on shutdown: {e:?}");
|
||||
}
|
||||
None => break,
|
||||
},
|
||||
_ = tokio::time::sleep(Duration::from_millis(1000)).fuse() => {
|
||||
if true {
|
||||
let sc = receiver.sender_count();
|
||||
let rc = receiver.receiver_count();
|
||||
info!("waiting inserters {} items {} senders {} receivers {}", futs.len(), receiver.len(), sc, rc);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
info!("all insert workers done.");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
+114
-59
@@ -5,11 +5,12 @@ use super::proto::CaMsgTy;
|
||||
use super::proto::CaProto;
|
||||
use super::store::DataStore;
|
||||
use super::ExtraInsertsConf;
|
||||
use crate::batchquery::series_by_channel::ChannelInfoQuery;
|
||||
use crate::bsread::ChannelDescDecoded;
|
||||
use crate::ca::proto::CreateChan;
|
||||
use crate::ca::proto::EventAdd;
|
||||
use crate::ca::store::ChannelRegistry;
|
||||
use crate::series::{Existence, SeriesId};
|
||||
use crate::series::Existence;
|
||||
use crate::series::SeriesId;
|
||||
use crate::store::ChannelInfoItem;
|
||||
use crate::store::ChannelStatus;
|
||||
use crate::store::ChannelStatusItem;
|
||||
@@ -17,9 +18,13 @@ use crate::store::CommonInsertItemQueueSender;
|
||||
use crate::store::ConnectionStatus;
|
||||
use crate::store::ConnectionStatusItem;
|
||||
use crate::store::{InsertItem, IvlItem, MuteItem, QueryItem};
|
||||
use async_channel::Sender;
|
||||
use err::Error;
|
||||
use futures_util::stream::FuturesOrdered;
|
||||
use futures_util::{Future, FutureExt, Stream, StreamExt, TryFutureExt};
|
||||
use futures_util::stream::FuturesUnordered;
|
||||
use futures_util::Future;
|
||||
use futures_util::FutureExt;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use log::*;
|
||||
use netpod::timeunits::*;
|
||||
use netpod::ScalarType;
|
||||
@@ -30,7 +35,6 @@ use serde::Serialize;
|
||||
use stats::CaConnStats;
|
||||
use stats::IntervalEma;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::net::SocketAddrV4;
|
||||
use std::ops::ControlFlow;
|
||||
@@ -378,9 +382,20 @@ impl ChannelSetOps {
|
||||
}
|
||||
}
|
||||
|
||||
struct ChannelOpsResources<'a> {
|
||||
channel_set_ops: &'a StdMutex<BTreeMap<String, ChannelSetOp>>,
|
||||
channels: &'a mut BTreeMap<Cid, ChannelState>,
|
||||
cid_by_name: &'a mut BTreeMap<String, Cid>,
|
||||
name_by_cid: &'a mut BTreeMap<Cid, String>,
|
||||
cid_store: &'a mut CidStore,
|
||||
init_state_count: &'a mut u64,
|
||||
channel_set_ops_flag: &'a AtomicUsize,
|
||||
}
|
||||
|
||||
pub struct CaConn {
|
||||
state: CaConnState,
|
||||
shutdown: bool,
|
||||
ticker: Pin<Box<tokio::time::Sleep>>,
|
||||
proto: Option<CaProto>,
|
||||
cid_store: CidStore,
|
||||
subid_store: SubidStore,
|
||||
@@ -389,12 +404,9 @@ pub struct CaConn {
|
||||
cid_by_name: BTreeMap<String, Cid>,
|
||||
cid_by_subid: BTreeMap<u32, Cid>,
|
||||
name_by_cid: BTreeMap<Cid, String>,
|
||||
data_store: Arc<DataStore>,
|
||||
insert_item_queue: VecDeque<QueryItem>,
|
||||
insert_item_sender: CommonInsertItemQueueSender,
|
||||
insert_item_send_fut: Option<async_channel::Send<'static, QueryItem>>,
|
||||
fut_get_series:
|
||||
FuturesOrdered<Pin<Box<dyn Future<Output = Result<(Cid, u32, u16, u16, Existence<SeriesId>), Error>> + Send>>>,
|
||||
backend: String,
|
||||
remote_addr_dbg: SocketAddrV4,
|
||||
local_epics_hostname: String,
|
||||
@@ -412,6 +424,11 @@ pub struct CaConn {
|
||||
ioc_ping_start: Option<Instant>,
|
||||
cmd_res_queue: VecDeque<ConnCommandResult>,
|
||||
channel_set_ops: Arc<ChannelSetOps>,
|
||||
channel_info_query_tx: Sender<ChannelInfoQuery>,
|
||||
series_lookup_schedule: BTreeMap<Cid, ChannelInfoQuery>,
|
||||
series_lookup_futs: FuturesUnordered<
|
||||
Pin<Box<dyn Future<Output = Result<(Cid, u32, u16, u16, Existence<SeriesId>), Error>> + Send>>,
|
||||
>,
|
||||
}
|
||||
|
||||
impl CaConn {
|
||||
@@ -419,7 +436,8 @@ impl CaConn {
|
||||
backend: String,
|
||||
remote_addr_dbg: SocketAddrV4,
|
||||
local_epics_hostname: String,
|
||||
data_store: Arc<DataStore>,
|
||||
channel_info_query_tx: Sender<ChannelInfoQuery>,
|
||||
_data_store: Arc<DataStore>,
|
||||
insert_item_sender: CommonInsertItemQueueSender,
|
||||
array_truncate: usize,
|
||||
insert_queue_max: usize,
|
||||
@@ -428,6 +446,7 @@ impl CaConn {
|
||||
Self {
|
||||
state: CaConnState::Unconnected,
|
||||
shutdown: false,
|
||||
ticker: Box::pin(tokio::time::sleep(Duration::from_millis(500))),
|
||||
proto: None,
|
||||
cid_store: CidStore::new(),
|
||||
subid_store: SubidStore::new(),
|
||||
@@ -436,11 +455,9 @@ impl CaConn {
|
||||
cid_by_name: BTreeMap::new(),
|
||||
cid_by_subid: BTreeMap::new(),
|
||||
name_by_cid: BTreeMap::new(),
|
||||
data_store,
|
||||
insert_item_queue: VecDeque::new(),
|
||||
insert_item_sender,
|
||||
insert_item_send_fut: None,
|
||||
fut_get_series: FuturesOrdered::new(),
|
||||
backend,
|
||||
remote_addr_dbg,
|
||||
local_epics_hostname,
|
||||
@@ -461,6 +478,9 @@ impl CaConn {
|
||||
ops: StdMutex::new(BTreeMap::new()),
|
||||
flag: AtomicUsize::new(0),
|
||||
}),
|
||||
channel_info_query_tx,
|
||||
series_lookup_schedule: BTreeMap::new(),
|
||||
series_lookup_futs: FuturesUnordered::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -474,7 +494,7 @@ impl CaConn {
|
||||
|
||||
fn trigger_shutdown(&mut self) {
|
||||
self.shutdown = true;
|
||||
for (k, v) in self.channels.iter_mut() {
|
||||
for (_k, v) in self.channels.iter_mut() {
|
||||
match v {
|
||||
ChannelState::Init => {
|
||||
*v = ChannelState::Ended;
|
||||
@@ -810,8 +830,9 @@ impl CaConn {
|
||||
for (_, st) in &self.channels {
|
||||
match st {
|
||||
ChannelState::Creating { cid, ts_beg } => {
|
||||
if tsnow.duration_since(*ts_beg) >= Duration::from_millis(10000) {
|
||||
if false && tsnow.duration_since(*ts_beg) >= Duration::from_millis(10000) {
|
||||
let name = self.name_by_cid.get(cid);
|
||||
// TODO channel create timed out how to let daemon know?
|
||||
warn!("channel Creating timed out {} {:?}", cid.0, name);
|
||||
}
|
||||
}
|
||||
@@ -945,16 +966,36 @@ impl CaConn {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_get_series_futs(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
|
||||
fn emit_series_lookup(&mut self, cx: &mut Context) {
|
||||
let _ = cx;
|
||||
loop {
|
||||
break if let Some(mut entry) = self.series_lookup_schedule.first_entry() {
|
||||
let dummy = entry.get().dummy();
|
||||
let query = std::mem::replace(entry.get_mut(), dummy);
|
||||
match self.channel_info_query_tx.try_send(query) {
|
||||
Ok(()) => {
|
||||
entry.remove();
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
*entry.get_mut() = e.into_inner();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
()
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_channel_info_results(&mut self, cx: &mut Context) {
|
||||
use Poll::*;
|
||||
while self.fut_get_series.len() > 0 {
|
||||
match self.fut_get_series.poll_next_unpin(cx) {
|
||||
loop {
|
||||
break match self.series_lookup_futs.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok((cid, sid, data_type, data_count, series)))) => {
|
||||
{
|
||||
let series = series.clone().into_inner();
|
||||
let item = QueryItem::ChannelStatus(ChannelStatusItem {
|
||||
ts: SystemTime::now(),
|
||||
series: series,
|
||||
series: series.clone().into_inner(),
|
||||
status: ChannelStatus::Opened,
|
||||
});
|
||||
self.insert_item_queue.push_back(item);
|
||||
@@ -962,16 +1003,17 @@ impl CaConn {
|
||||
match self.channel_to_evented(cid, sid, data_type, data_count, series, cx) {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
return Ready(Err(e));
|
||||
error!("poll_channel_info_results {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
Ready(Some(Err(e))) => return Ready(Err(e)),
|
||||
Ready(None) => return Ready(Err(Error::with_msg_no_trace("series lookup stream should never end"))),
|
||||
Pending => break,
|
||||
}
|
||||
Ready(Some(Err(e))) => {
|
||||
error!("poll_channel_info_results {e}");
|
||||
}
|
||||
Ready(None) => {}
|
||||
Pending => {}
|
||||
};
|
||||
}
|
||||
return Pending;
|
||||
}
|
||||
|
||||
fn event_add_insert(
|
||||
@@ -1287,7 +1329,7 @@ impl CaConn {
|
||||
use Poll::*;
|
||||
let mut ts1 = Instant::now();
|
||||
// TODO unify with Listen state where protocol gets polled as well.
|
||||
let mut msgs_tmp = vec![];
|
||||
let mut msgs_tmp = Vec::new();
|
||||
self.check_channels_state_init(&mut msgs_tmp)?;
|
||||
let ts2 = Instant::now();
|
||||
self.stats
|
||||
@@ -1354,23 +1396,43 @@ impl CaConn {
|
||||
info_store_msp_last: info_store_msp_from_time(SystemTime::now()),
|
||||
});
|
||||
// TODO handle error in different way. Should most likely not abort.
|
||||
let cd = ChannelDescDecoded {
|
||||
name: name.to_string(),
|
||||
scalar_type,
|
||||
shape,
|
||||
let _cd = ChannelDescDecoded {
|
||||
name: name.clone(),
|
||||
scalar_type: scalar_type.clone(),
|
||||
shape: shape.clone(),
|
||||
agg_kind: netpod::AggKind::Plain,
|
||||
// TODO these play no role in series id:
|
||||
byte_order: netpod::ByteOrder::Little,
|
||||
compression: None,
|
||||
};
|
||||
let z = unsafe {
|
||||
&*(&self.data_store.chan_reg as &ChannelRegistry as *const ChannelRegistry)
|
||||
let (tx, rx) = async_channel::bounded(1);
|
||||
let query = ChannelInfoQuery {
|
||||
backend: self.backend.clone(),
|
||||
channel: name.clone(),
|
||||
scalar_type: scalar_type.to_scylla_i32(),
|
||||
shape_dims: shape.to_scylla_vec(),
|
||||
tx,
|
||||
};
|
||||
let fut = z
|
||||
.get_series_id(cd, self.backend.clone())
|
||||
.map_ok(move |series| (cid, k.sid, k.data_type, k.data_count, series));
|
||||
// TODO throttle execution rate:
|
||||
self.fut_get_series.push_back(Box::pin(fut) as _);
|
||||
if !self.series_lookup_schedule.contains_key(&cid) {
|
||||
self.series_lookup_schedule.insert(cid, query);
|
||||
let fut = async move {
|
||||
match rx.recv().await {
|
||||
Ok(item) => match item {
|
||||
Ok(item) => Ok((cid, sid, k.data_type, k.data_count, item)),
|
||||
Err(e) => Err(e),
|
||||
},
|
||||
Err(e) => {
|
||||
// TODO count only
|
||||
error!("can not receive series lookup result {e}");
|
||||
Err(Error::with_msg_no_trace("can not receive lookup result"))
|
||||
}
|
||||
}
|
||||
};
|
||||
self.series_lookup_futs.push(Box::pin(fut));
|
||||
} else {
|
||||
// TODO count only
|
||||
warn!("series lookup for {name} already in progress");
|
||||
}
|
||||
do_wake_again = true;
|
||||
}
|
||||
CaMsgTy::EventAddRes(k) => {
|
||||
@@ -1404,6 +1466,9 @@ impl CaConn {
|
||||
self.ioc_ping_last = Instant::now();
|
||||
self.ioc_ping_start = None;
|
||||
}
|
||||
CaMsgTy::CreateChanFail(_) => {
|
||||
// TODO handle CreateChanFail
|
||||
}
|
||||
_ => {
|
||||
warn!("Received unexpected protocol message {:?}", k);
|
||||
}
|
||||
@@ -1440,7 +1505,6 @@ impl CaConn {
|
||||
};
|
||||
if do_wake_again {
|
||||
// TODO remove the need for this:
|
||||
trace!("do_wake_again");
|
||||
cx.waker().wake_by_ref();
|
||||
}
|
||||
res
|
||||
@@ -1545,14 +1609,6 @@ impl CaConn {
|
||||
Pending => Some(Pending),
|
||||
},
|
||||
CaConnState::PeerReady => {
|
||||
{
|
||||
// TODO can I move this block somewhere else?
|
||||
match self.handle_get_series_futs(cx) {
|
||||
Ready(Ok(_)) => (),
|
||||
Ready(Err(e)) => return Some(Ready(Err(e))),
|
||||
Pending => (),
|
||||
}
|
||||
}
|
||||
let res = self.handle_peer_ready(cx);
|
||||
match res {
|
||||
Ready(Some(Ok(()))) => None,
|
||||
@@ -1588,7 +1644,7 @@ impl CaConn {
|
||||
}
|
||||
}
|
||||
|
||||
fn apply_3(res: ChannelOpsResources) {
|
||||
fn apply_channel_ops_with_res(res: ChannelOpsResources) {
|
||||
let mut g = res.channel_set_ops.lock().unwrap();
|
||||
let map = std::mem::replace(&mut *g, BTreeMap::new());
|
||||
for (ch, op) in map {
|
||||
@@ -1609,7 +1665,7 @@ impl CaConn {
|
||||
res.channel_set_ops_flag.store(0, atomic::Ordering::Release);
|
||||
}
|
||||
|
||||
fn apply_2(&mut self) {
|
||||
fn apply_channel_ops(&mut self) {
|
||||
let res = ChannelOpsResources {
|
||||
channel_set_ops: &self.channel_set_ops.ops,
|
||||
channels: &mut self.channels,
|
||||
@@ -1619,20 +1675,10 @@ impl CaConn {
|
||||
init_state_count: &mut self.init_state_count,
|
||||
channel_set_ops_flag: &self.channel_set_ops.flag,
|
||||
};
|
||||
Self::apply_3(res)
|
||||
Self::apply_channel_ops_with_res(res)
|
||||
}
|
||||
}
|
||||
|
||||
struct ChannelOpsResources<'a> {
|
||||
channel_set_ops: &'a StdMutex<BTreeMap<String, ChannelSetOp>>,
|
||||
channels: &'a mut BTreeMap<Cid, ChannelState>,
|
||||
cid_by_name: &'a mut BTreeMap<String, Cid>,
|
||||
name_by_cid: &'a mut BTreeMap<Cid, String>,
|
||||
cid_store: &'a mut CidStore,
|
||||
init_state_count: &'a mut u64,
|
||||
channel_set_ops_flag: &'a AtomicUsize,
|
||||
}
|
||||
|
||||
impl Stream for CaConn {
|
||||
type Item = Result<CaConnEvent, Error>;
|
||||
|
||||
@@ -1641,7 +1687,16 @@ impl Stream for CaConn {
|
||||
let poll_ts1 = Instant::now();
|
||||
self.stats.caconn_poll_count_inc();
|
||||
if self.channel_set_ops.flag.load(atomic::Ordering::Acquire) > 0 {
|
||||
Self::apply_2(&mut self);
|
||||
self.apply_channel_ops();
|
||||
}
|
||||
self.emit_series_lookup(cx);
|
||||
self.poll_channel_info_results(cx);
|
||||
match self.ticker.poll_unpin(cx) {
|
||||
Ready(()) => {
|
||||
self.ticker = Box::pin(tokio::time::sleep(Duration::from_millis(500)));
|
||||
cx.waker().wake_by_ref();
|
||||
}
|
||||
Pending => {}
|
||||
}
|
||||
let ret = if let Some(item) = self.cmd_res_queue.pop_front() {
|
||||
Ready(Some(Ok(CaConnEvent {
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use super::conn::CaConnEvent;
|
||||
use super::conn::ChannelSetOp;
|
||||
use super::conn::ChannelSetOps;
|
||||
use super::conn::ConnCommand;
|
||||
use super::store::DataStore;
|
||||
use super::SlowWarnable;
|
||||
use crate::batchquery::series_by_channel::ChannelInfoQuery;
|
||||
use crate::ca::conn::CaConn;
|
||||
use crate::ca::conn::CaConnEventValue;
|
||||
use crate::errconv::ErrConv;
|
||||
@@ -55,15 +55,17 @@ pub struct CaConnSet {
|
||||
ca_conn_ress: Arc<TokMx<BTreeMap<SocketAddrV4, CaConnRess>>>,
|
||||
conn_item_tx: Sender<(SocketAddrV4, CaConnEvent)>,
|
||||
conn_item_rx: Receiver<(SocketAddrV4, CaConnEvent)>,
|
||||
channel_info_query_tx: Sender<ChannelInfoQuery>,
|
||||
}
|
||||
|
||||
impl CaConnSet {
|
||||
pub fn new() -> Self {
|
||||
pub fn new(channel_info_query_tx: Sender<ChannelInfoQuery>) -> Self {
|
||||
let (conn_item_tx, conn_item_rx) = async_channel::bounded(10000);
|
||||
Self {
|
||||
ca_conn_ress: Arc::new(TokMx::new(BTreeMap::new())),
|
||||
conn_item_tx,
|
||||
conn_item_rx,
|
||||
channel_info_query_tx,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,6 +94,7 @@ impl CaConnSet {
|
||||
backend.clone(),
|
||||
addr,
|
||||
local_epics_hostname,
|
||||
self.channel_info_query_tx.clone(),
|
||||
data_store.clone(),
|
||||
insert_item_queue_sender,
|
||||
array_truncate,
|
||||
|
||||
@@ -1,44 +1,9 @@
|
||||
use crate::bsread::ChannelDescDecoded;
|
||||
use crate::series::{Existence, SeriesId};
|
||||
use async_channel::{Receiver, Sender};
|
||||
use err::Error;
|
||||
use netpod::ScyllaConfig;
|
||||
use scylla::prepared_statement::PreparedStatement;
|
||||
use scylla::statement::Consistency;
|
||||
use scylla::Session as ScySession;
|
||||
use std::sync::Arc;
|
||||
use tokio_postgres::Client as PgClient;
|
||||
|
||||
#[allow(unused)]
|
||||
pub struct RegisterJob {
|
||||
desc: ChannelDescDecoded,
|
||||
}
|
||||
|
||||
impl RegisterJob {
|
||||
pub fn new(desc: ChannelDescDecoded) -> Self {
|
||||
Self { desc }
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub struct RegisterChannel {
|
||||
tx: Sender<RegisterJob>,
|
||||
rx: Receiver<RegisterJob>,
|
||||
}
|
||||
|
||||
pub struct ChannelRegistry {
|
||||
pg_client: Arc<PgClient>,
|
||||
}
|
||||
|
||||
impl ChannelRegistry {
|
||||
pub fn new(pg_client: Arc<PgClient>) -> Self {
|
||||
Self { pg_client }
|
||||
}
|
||||
|
||||
pub async fn get_series_id(&self, cd: ChannelDescDecoded, backend: String) -> Result<Existence<SeriesId>, Error> {
|
||||
crate::series::get_series_id(&self.pg_client, &cd, backend).await
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DataStore {
|
||||
pub scy: Arc<ScySession>,
|
||||
@@ -61,11 +26,10 @@ pub struct DataStore {
|
||||
pub qu_insert_channel_status: Arc<PreparedStatement>,
|
||||
pub qu_insert_channel_status_by_ts_msp: Arc<PreparedStatement>,
|
||||
pub qu_insert_channel_ping: Arc<PreparedStatement>,
|
||||
pub chan_reg: Arc<ChannelRegistry>,
|
||||
}
|
||||
|
||||
impl DataStore {
|
||||
pub async fn new(scyconf: &ScyllaConfig, pg_client: Arc<PgClient>) -> Result<Self, Error> {
|
||||
pub async fn new(scyconf: &ScyllaConfig) -> Result<Self, Error> {
|
||||
let scy = scylla::SessionBuilder::new()
|
||||
.known_nodes(&scyconf.hosts)
|
||||
.default_consistency(Consistency::LocalOne)
|
||||
@@ -182,7 +146,6 @@ impl DataStore {
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
|
||||
let qu_insert_channel_ping = Arc::new(q);
|
||||
let ret = Self {
|
||||
chan_reg: Arc::new(ChannelRegistry::new(pg_client)),
|
||||
scy,
|
||||
qu_insert_ts_msp,
|
||||
qu_insert_series_by_ts_msp,
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
use crate::errconv::ErrConv;
|
||||
use err::Error;
|
||||
use netpod::Database;
|
||||
use tokio_postgres::Client as PgClient;
|
||||
|
||||
pub async fn make_pg_client(d: &Database) -> Result<PgClient, Error> {
|
||||
let (client, pg_conn) = tokio_postgres::connect(
|
||||
&format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name),
|
||||
tokio_postgres::tls::NoTls,
|
||||
)
|
||||
.await
|
||||
.err_conv()?;
|
||||
// TODO allow clean shutdown on ctrl-c and join the pg_conn in the end:
|
||||
tokio::spawn(pg_conn);
|
||||
Ok(client)
|
||||
}
|
||||
@@ -56,7 +56,7 @@ pub async fn spawn_scylla_insert_workers(
|
||||
insert_worker_count: usize,
|
||||
insert_item_queue: Arc<CommonInsertItemQueue>,
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
pg_client: Arc<PgClient>,
|
||||
_pg_client: Arc<PgClient>,
|
||||
store_stats: Arc<stats::CaConnStats>,
|
||||
use_rate_limit_queue: bool,
|
||||
ttls: Ttls,
|
||||
@@ -119,7 +119,7 @@ pub async fn spawn_scylla_insert_workers(
|
||||
let mut jhs = Vec::new();
|
||||
let mut data_stores = Vec::new();
|
||||
for _ in 0..insert_scylla_sessions {
|
||||
let data_store = Arc::new(DataStore::new(&scyconf, pg_client.clone()).await?);
|
||||
let data_store = Arc::new(DataStore::new(&scyconf).await?);
|
||||
data_stores.push(data_store);
|
||||
}
|
||||
for i1 in 0..insert_worker_count {
|
||||
|
||||
+56
-53
@@ -1,12 +1,14 @@
|
||||
use crate::ca::conn::ConnCommand;
|
||||
use crate::ca::IngestCommons;
|
||||
use crate::ca::METRICS;
|
||||
use axum::extract::Query;
|
||||
use err::Error;
|
||||
use http::Request;
|
||||
use log::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff};
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use stats::CaConnStats;
|
||||
use stats::CaConnStatsAgg;
|
||||
use stats::CaConnStatsAggDiff;
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddrV4;
|
||||
use std::sync::atomic::Ordering;
|
||||
@@ -26,7 +28,7 @@ impl ExtraInsertsConf {
|
||||
|
||||
async fn find_channel(
|
||||
params: HashMap<String, String>,
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
dcom: Arc<DaemonComm>,
|
||||
) -> axum::Json<Vec<(String, Vec<String>)>> {
|
||||
let pattern = params.get("pattern").map_or(String::new(), |x| x.clone()).to_string();
|
||||
// TODO ask Daemon for that information.
|
||||
@@ -35,7 +37,7 @@ async fn find_channel(
|
||||
axum::Json(res)
|
||||
}
|
||||
|
||||
async fn channel_add_inner(params: HashMap<String, String>, ingest_commons: Arc<IngestCommons>) -> Result<(), Error> {
|
||||
async fn channel_add_inner(params: HashMap<String, String>, dcom: Arc<DaemonComm>) -> Result<(), Error> {
|
||||
if let (Some(backend), Some(name)) = (params.get("backend"), params.get("name")) {
|
||||
error!("TODO channel_add_inner");
|
||||
Err(Error::with_msg_no_trace(format!("TODO channel_add_inner")))
|
||||
@@ -44,18 +46,15 @@ async fn channel_add_inner(params: HashMap<String, String>, ingest_commons: Arc<
|
||||
}
|
||||
}
|
||||
|
||||
async fn channel_add(params: HashMap<String, String>, ingest_commons: Arc<IngestCommons>) -> axum::Json<bool> {
|
||||
let ret = match channel_add_inner(params, ingest_commons).await {
|
||||
async fn channel_add(params: HashMap<String, String>, dcom: Arc<DaemonComm>) -> axum::Json<bool> {
|
||||
let ret = match channel_add_inner(params, dcom).await {
|
||||
Ok(_) => true,
|
||||
Err(_) => false,
|
||||
};
|
||||
axum::Json(ret)
|
||||
}
|
||||
|
||||
async fn channel_remove(
|
||||
params: HashMap<String, String>,
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
) -> axum::Json<serde_json::Value> {
|
||||
async fn channel_remove(params: HashMap<String, String>, dcom: Arc<DaemonComm>) -> axum::Json<serde_json::Value> {
|
||||
use axum::Json;
|
||||
use serde_json::Value;
|
||||
let addr = if let Some(x) = params.get("addr") {
|
||||
@@ -81,7 +80,7 @@ async fn channel_remove(
|
||||
Json(Value::Bool(false))
|
||||
}
|
||||
|
||||
async fn channel_state(params: HashMap<String, String>, ingest_commons: Arc<IngestCommons>) -> axum::Json<bool> {
|
||||
async fn channel_state(params: HashMap<String, String>, dcom: Arc<DaemonComm>) -> axum::Json<bool> {
|
||||
let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string();
|
||||
error!("TODO channel_state");
|
||||
axum::Json(false)
|
||||
@@ -89,14 +88,14 @@ async fn channel_state(params: HashMap<String, String>, ingest_commons: Arc<Inge
|
||||
|
||||
async fn channel_states(
|
||||
params: HashMap<String, String>,
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
dcom: Arc<DaemonComm>,
|
||||
) -> axum::Json<Vec<crate::ca::conn::ChannelStateInfo>> {
|
||||
let limit = params.get("limit").map(|x| x.parse()).unwrap_or(Ok(40)).unwrap_or(40);
|
||||
error!("TODO channel_state");
|
||||
axum::Json(Vec::new())
|
||||
}
|
||||
|
||||
async fn extra_inserts_conf_set(v: ExtraInsertsConf, ingest_commons: Arc<IngestCommons>) -> axum::Json<bool> {
|
||||
async fn extra_inserts_conf_set(v: ExtraInsertsConf, dcom: Arc<DaemonComm>) -> axum::Json<bool> {
|
||||
// TODO ingest_commons is the authorative value. Should have common function outside of this metrics which
|
||||
// can update everything to a given value.
|
||||
error!("TODO extra_inserts_conf_set");
|
||||
@@ -111,12 +110,22 @@ struct DummyQuery {
|
||||
age: usize,
|
||||
}
|
||||
|
||||
pub async fn start_metrics_service(bind_to: String, ingest_commons: Arc<IngestCommons>) {
|
||||
pub struct DaemonComm {}
|
||||
|
||||
impl DaemonComm {
|
||||
pub fn dummy() -> Self {
|
||||
Self {}
|
||||
}
|
||||
}
|
||||
|
||||
fn make_routes(dcom: Arc<DaemonComm>) -> axum::Router {
|
||||
use axum::extract;
|
||||
use axum::http::StatusCode;
|
||||
use axum::routing::{get, put};
|
||||
use axum::routing::get;
|
||||
use axum::routing::put;
|
||||
use axum::Router;
|
||||
let app = Router::new()
|
||||
use http::StatusCode;
|
||||
|
||||
Router::new()
|
||||
.fallback(|req: Request<axum::body::Body>| async move {
|
||||
info!("Fallback for {} {}", req.method(), req.uri());
|
||||
StatusCode::NOT_FOUND
|
||||
@@ -149,89 +158,83 @@ pub async fn start_metrics_service(bind_to: String, ingest_commons: Arc<IngestCo
|
||||
.route(
|
||||
"/daqingest/find/channel",
|
||||
get({
|
||||
let ingest_commons = ingest_commons.clone();
|
||||
|Query(params): Query<HashMap<String, String>>| find_channel(params, ingest_commons)
|
||||
let dcom = dcom.clone();
|
||||
|Query(params): Query<HashMap<String, String>>| find_channel(params, dcom)
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/daqingest/channel/state",
|
||||
get({
|
||||
let ingest_commons = ingest_commons.clone();
|
||||
|Query(params): Query<HashMap<String, String>>| channel_state(params, ingest_commons)
|
||||
let dcom = dcom.clone();
|
||||
|Query(params): Query<HashMap<String, String>>| channel_state(params, dcom)
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/daqingest/channel/states",
|
||||
get({
|
||||
let ingest_commons = ingest_commons.clone();
|
||||
|Query(params): Query<HashMap<String, String>>| channel_states(params, ingest_commons)
|
||||
let dcom = dcom.clone();
|
||||
|Query(params): Query<HashMap<String, String>>| channel_states(params, dcom)
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/daqingest/channel/add",
|
||||
get({
|
||||
let ingest_commons = ingest_commons.clone();
|
||||
|Query(params): Query<HashMap<String, String>>| channel_add(params, ingest_commons)
|
||||
let dcom = dcom.clone();
|
||||
|Query(params): Query<HashMap<String, String>>| channel_add(params, dcom)
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/daqingest/channel/remove",
|
||||
get({
|
||||
let ingest_commons = ingest_commons.clone();
|
||||
|Query(params): Query<HashMap<String, String>>| channel_remove(params, ingest_commons)
|
||||
let dcom = dcom.clone();
|
||||
|Query(params): Query<HashMap<String, String>>| channel_remove(params, dcom)
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/store_workers_rate",
|
||||
get({
|
||||
let c = ingest_commons.clone();
|
||||
|| async move { axum::Json(c.store_workers_rate.load(Ordering::Acquire)) }
|
||||
let dcom = dcom.clone();
|
||||
|| async move { axum::Json(123) }
|
||||
})
|
||||
.put({
|
||||
let c = ingest_commons.clone();
|
||||
|v: extract::Json<u64>| async move {
|
||||
c.store_workers_rate.store(v.0, Ordering::Release);
|
||||
}
|
||||
let dcom = dcom.clone();
|
||||
|v: extract::Json<u64>| async move {}
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/insert_frac",
|
||||
get({
|
||||
let c = ingest_commons.clone();
|
||||
|| async move { axum::Json(c.insert_frac.load(Ordering::Acquire)) }
|
||||
let dcom = dcom.clone();
|
||||
|| async move { axum::Json(123) }
|
||||
})
|
||||
.put({
|
||||
let c = ingest_commons.clone();
|
||||
|v: extract::Json<u64>| async move {
|
||||
c.insert_frac.store(v.0, Ordering::Release);
|
||||
}
|
||||
let dcom = dcom.clone();
|
||||
|v: extract::Json<u64>| async move {}
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/extra_inserts_conf",
|
||||
get({
|
||||
let c = ingest_commons.clone();
|
||||
|| async move {
|
||||
let res = c.extra_inserts_conf.lock().await;
|
||||
axum::Json(serde_json::to_value(&*res).unwrap())
|
||||
}
|
||||
let dcom = dcom.clone();
|
||||
|| async move { axum::Json(serde_json::to_value(&"TODO").unwrap()) }
|
||||
})
|
||||
.put({
|
||||
let ingest_commons = ingest_commons.clone();
|
||||
|v: extract::Json<ExtraInsertsConf>| extra_inserts_conf_set(v.0, ingest_commons)
|
||||
let dcom = dcom.clone();
|
||||
|v: extract::Json<ExtraInsertsConf>| extra_inserts_conf_set(v.0, dcom)
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/insert_ivl_min",
|
||||
put({
|
||||
let insert_ivl_min = ingest_commons.insert_ivl_min.clone();
|
||||
|v: extract::Json<u64>| async move {
|
||||
insert_ivl_min.store(v.0, Ordering::Release);
|
||||
}
|
||||
let dcom = dcom.clone();
|
||||
|v: extract::Json<u64>| async move {}
|
||||
}),
|
||||
);
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn start_metrics_service(bind_to: String, dcom: Arc<DaemonComm>) {
|
||||
axum::Server::bind(&bind_to.parse().unwrap())
|
||||
.serve(app.into_make_service())
|
||||
.serve(make_routes(dcom).into_make_service())
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
pub mod batcher;
|
||||
pub mod batchquery;
|
||||
pub mod bsread;
|
||||
pub mod ca;
|
||||
pub mod channelwriter;
|
||||
pub mod conf;
|
||||
pub mod dbpg;
|
||||
pub mod errconv;
|
||||
pub mod insertworker;
|
||||
pub mod linuxhelper;
|
||||
|
||||
@@ -264,3 +264,9 @@ stats_proc::stats_struct!((
|
||||
agg(name(CaConnStatsAgg), parent(CaConnStats)),
|
||||
diff(name(CaConnStatsAggDiff), input(CaConnStatsAgg)),
|
||||
));
|
||||
|
||||
stats_proc::stats_struct!((
|
||||
stats_struct(name(DaemonStats), counters(main_lookupaddr_ok)),
|
||||
agg(name(DaemonStatsAgg), parent(DaemonStats)),
|
||||
diff(name(DaemonStatsAggDiff), input(DaemonStatsAgg)),
|
||||
));
|
||||
|
||||
Reference in New Issue
Block a user