diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 6f51e1f..fb498d7 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -298,7 +298,7 @@ pub struct Daemon { impl Daemon { pub async fn new(opts: DaemonOpts) -> Result { let pg_client = Arc::new(make_pg_client(&opts.pgconf).await?); - let datastore = DataStore::new(&opts.scyconf, pg_client.clone()).await?; + let datastore = DataStore::new(&opts.scyconf).await?; let datastore = Arc::new(datastore); let (tx, rx) = async_channel::bounded(32); let pgcs = { @@ -310,6 +310,9 @@ impl Daemon { a }; let (search_tx, ioc_finder_jh) = Self::start_finder(tx.clone(), opts.backend().into(), pgcs); + + let channel_info_query_tx = netfetch::batchquery::series_by_channel::start_task(&opts.pgconf).await?; + let common_insert_item_queue = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap)); let common_insert_item_queue_2 = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap)); let insert_queue_counter = Arc::new(AtomicUsize::new(0)); @@ -388,7 +391,7 @@ impl Daemon { extra_inserts_conf: tokio::sync::Mutex::new(ExtraInsertsConf::new()), store_workers_rate: AtomicU64::new(20000), insert_frac: AtomicU64::new(1000), - ca_conn_set: CaConnSet::new(), + ca_conn_set: CaConnSet::new(channel_info_query_tx), }; let ingest_commons = Arc::new(ingest_commons); @@ -491,57 +494,20 @@ impl Daemon { } let (qtx, qrx) = async_channel::bounded(CURRENT_SEARCH_PENDING_MAX); let fut = async move { - let (batch_tx, batch_rx) = async_channel::bounded(SEARCH_DB_PIPELINE_LEN); - let fut2 = async move { - let mut batch_ix = 0 as usize; - let mut all = Vec::new(); - let mut do_emit = false; - loop { - if do_emit { - do_emit = false; - let batch = std::mem::replace(&mut all, Vec::new()); - let n = batch.len(); - trace_batch!("--- BATCH TRY SEND"); - match batch_tx.send((batch_ix, batch)).await { - Ok(_) => { - trace_batch!("--- BATCH SEND DONE"); - batch_ix += 1; - SEARCH_REQ_BATCH_SEND_COUNT.fetch_add(n, atomic::Ordering::AcqRel); - } - Err(e) => { - error!("can not send batch"); - all = e.0 .1; - } - } - } - match tokio::time::timeout(Duration::from_millis(200), qrx.recv()).await { - Ok(k) => match k { - Ok(item) => { - SEARCH_REQ_RECV_COUNT.fetch_add(1, atomic::Ordering::AcqRel); - all.push(item); - if all.len() >= SEARCH_BATCH_MAX { - do_emit = true; - } - } - Err(e) => { - error!("error in batcher, no more input {e}"); - break; - } - }, - Err(e) => { - let _e: tokio::time::error::Elapsed = e; - if all.len() > 0 { - do_emit = true; - } - } - } - } - warn!("-------- batcher is done --------------"); - }; - tokio::spawn(fut2); + let (batch_rx, _jh) = netfetch::batcher::batch( + SEARCH_BATCH_MAX, + Duration::from_millis(200), + SEARCH_DB_PIPELINE_LEN, + qrx, + ); let (pgc_tx, pgc_rx) = async_channel::bounded(128); for pgc in pgcs { - let sql = "with q1 as (select * from unnest($2::text[]) as unn (ch)) select distinct on (tt.facility, tt.channel) tt.channel, tt.addr from ioc_by_channel_log tt join q1 on tt.channel = q1.ch and tt.facility = $1 and tt.addr is not null order by tt.facility, tt.channel, tsmod desc"; + let sql = concat!( + "with q1 as (select * from unnest($2::text[]) as unn (ch))", + " select distinct on (tt.facility, tt.channel) tt.channel, tt.addr", + " from ioc_by_channel_log tt join q1 on tt.channel = q1.ch and tt.facility = $1 and tt.addr is not null", + " order by tt.facility, tt.channel, tsmod desc", + ); let qu_select_multi = pgc.prepare(sql).await.unwrap(); let qu_select_multi = Arc::new(qu_select_multi); match pgc_tx.send((pgc, qu_select_multi)).await { @@ -553,7 +519,7 @@ impl Daemon { } let backend = Arc::new(backend.clone()); let stream = batch_rx - .map(|(batch_ix, batch): (usize, Vec)| { + .map(|batch: Vec| { let pgc_tx = pgc_tx.clone(); let pgc_rx = pgc_rx.clone(); let backend = backend.clone(); @@ -561,12 +527,11 @@ impl Daemon { async move { let ts1 = Instant::now(); let (pgc, qu_select_multi) = pgc_rx.recv().await.unwrap(); - debug_batch!("run query batch {} len {}", batch_ix, batch.len()); + debug_batch!("run query batch len {}", batch.len()); let qres = pgc.query(qu_select_multi.as_ref(), &[backend.as_ref(), &batch]).await; let dt = ts1.elapsed(); debug_batch!( - "done query batch {} len {}: {} {:.3}ms", - batch_ix, + "done query batch len {}: {} {:.3}ms", batch.len(), qres.is_ok(), dt.as_secs_f32() * 1e3 @@ -582,16 +547,16 @@ impl Daemon { out.push('\''); } out.push(']'); - eprintln!("VERY LONG QUERY batch_ix {batch_ix}\n{out}"); + eprintln!("VERY SLOW QUERY\n{out}"); } pgc_tx.send((pgc, qu_select_multi)).await.unwrap(); - (batch_ix, batch, qres) + (batch, qres) } }) .buffer_unordered(SEARCH_DB_PIPELINE_LEN); let mut resdiff = 0; let mut stream = Box::pin(stream); - while let Some((batch_ix, batch, pgres)) = stream.next().await { + while let Some((batch, pgres)) = stream.next().await { match pgres { Ok(rows) => { if rows.len() > batch.len() { @@ -624,12 +589,9 @@ impl Daemon { error!("STILL NOT MATCHING LEN"); } SEARCH_RES_3_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel); - debug_batch!("TRY SEND batch_ix {batch_ix}"); let x = tx.send(DaemonEvent::SearchDone(Ok(items))).await; match x { - Ok(_) => { - debug_batch!("DONE SEND batch_ix {batch_ix}"); - } + Ok(_) => {} Err(e) => { error!("finder sees: {e}"); break; @@ -1088,7 +1050,8 @@ impl Daemon { ActiveChannelState::SearchPending { since: _, did_send: _ } => {} ActiveChannelState::WithAddress { addr, state: _ } => { if addr == &conn_addr { - info!("ca conn down, reset {k:?}"); + // TODO reset channel, emit log event for the connection addr only + //info!("ca conn down, reset {k:?}"); *v = ChannelState { value: ChannelStateValue::Active(ActiveChannelState::UnknownAddress { since: SystemTime::now(), @@ -1222,6 +1185,15 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> info!("start up {opts:?}"); netfetch::linuxhelper::set_signal_handler(libc::SIGINT, handler_sigint)?; netfetch::linuxhelper::set_signal_handler(libc::SIGTERM, handler_sigterm)?; + + let dcom = Arc::new(netfetch::metrics::DaemonComm::dummy()); + netfetch::metrics::start_metrics_service(opts.api_bind(), dcom); + + // TODO use a new stats type: + let store_stats = Arc::new(CaConnStats::new()); + let metrics_agg_fut = metrics_agg_task(ingest_commons.clone(), local_stats.clone(), store_stats.clone()); + let metrics_agg_jh = tokio::spawn(metrics_agg_fut); + let opts2 = DaemonOpts { backend: opts.backend().into(), local_epics_hostname: opts.local_epics_hostname().into(), diff --git a/netfetch/src/batcher.rs b/netfetch/src/batcher.rs new file mode 100644 index 0000000..21b18c0 --- /dev/null +++ b/netfetch/src/batcher.rs @@ -0,0 +1,54 @@ +use async_channel::Receiver; +use netpod::log::*; +use std::time::Duration; + +pub fn batch( + batch_limit: usize, + timeout: Duration, + outcap: usize, + rx: Receiver, +) -> (Receiver>, tokio::task::JoinHandle<()>) +where + T: Send + 'static, +{ + let (batch_tx, batch_rx) = async_channel::bounded(outcap); + let fut2 = async move { + let mut all = Vec::new(); + let mut do_emit = false; + loop { + if do_emit { + do_emit = false; + let batch = std::mem::replace(&mut all, Vec::new()); + match batch_tx.send(batch).await { + Ok(_) => {} + Err(e) => { + error!("can not send batch"); + all = e.0; + } + } + } + match tokio::time::timeout(timeout, rx.recv()).await { + Ok(k) => match k { + Ok(item) => { + all.push(item); + if all.len() >= batch_limit { + do_emit = true; + } + } + Err(e) => { + error!("error in batcher, no more input {e}"); + break; + } + }, + Err(e) => { + let _e: tokio::time::error::Elapsed = e; + if all.len() > 0 { + do_emit = true; + } + } + } + } + warn!("-------- batcher is done --------------"); + }; + (batch_rx, tokio::spawn(fut2)) +} diff --git a/netfetch/src/batchquery.rs b/netfetch/src/batchquery.rs new file mode 100644 index 0000000..b1929bb --- /dev/null +++ b/netfetch/src/batchquery.rs @@ -0,0 +1 @@ +pub mod series_by_channel; diff --git a/netfetch/src/batchquery/series_by_channel.rs b/netfetch/src/batchquery/series_by_channel.rs new file mode 100644 index 0000000..c386112 --- /dev/null +++ b/netfetch/src/batchquery/series_by_channel.rs @@ -0,0 +1,190 @@ +use crate::batcher; +use crate::dbpg::make_pg_client; +use crate::series::Existence; +use crate::series::SeriesId; +use async_channel::Receiver; +use async_channel::Sender; +use err::Error; +use futures_util::StreamExt; +use netpod::log::*; +use netpod::Database; +use std::time::Duration; +use tokio::task::JoinHandle; +use tokio_postgres::Client as PgClient; +use tokio_postgres::Statement as PgStatement; + +pub struct ChannelInfoQuery { + pub backend: String, + pub channel: String, + pub scalar_type: i32, + pub shape_dims: Vec, + pub tx: Sender, Error>>, +} + +impl ChannelInfoQuery { + pub fn dummy(&self) -> Self { + Self { + backend: String::new(), + channel: String::new(), + scalar_type: 4242, + shape_dims: Vec::new(), + tx: self.tx.clone(), + } + } +} + +struct ChannelInfoResult { + series: Vec>, + tx: Vec, Error>>>, +} + +struct PgRes { + pgc: PgClient, + st: PgStatement, +} + +async fn prepare_pgcs(sql: &str, pgcn: usize, db: &Database) -> Result<(Sender, Receiver), Error> { + let (pgc_tx, pgc_rx) = async_channel::bounded(pgcn); + for _ in 0..pgcn { + let pgc = make_pg_client(&db).await?; + let st = pgc.prepare(sql).await.map_err(|e| Error::from(e.to_string()))?; + let k = PgRes { pgc, st }; + match pgc_tx.send(k).await { + Ok(_) => {} + Err(e) => { + error!("can not enqueue pgc {e}"); + } + } + } + Ok((pgc_tx, pgc_rx)) +} + +async fn fetch_data(pgres: PgRes) -> Result { + err::todoval() +} + +async fn run_queries( + npg: usize, + batch_rx: Receiver>, + pgc_rx: Receiver, + pgc_tx: Sender, +) -> Result<(), Error> { + let mut stream = batch_rx + .map(|batch| { + let pgc_rx = pgc_rx.clone(); + let pgc_tx = pgc_tx.clone(); + async move { + if let Ok(pgres) = pgc_rx.recv().await { + let mut backend = Vec::new(); + let mut channel = Vec::new(); + let mut scalar_type = Vec::new(); + let mut shape_dims: Vec = Vec::new(); + let mut rid = Vec::new(); + let mut tx = Vec::new(); + for (i, e) in batch.into_iter().enumerate() { + backend.push(e.backend); + channel.push(e.channel); + scalar_type.push(e.scalar_type); + let mut dims = String::with_capacity(16); + dims.push('{'); + for (i, v) in e.shape_dims.into_iter().enumerate() { + if i > 0 { + dims.push(','); + } + use std::fmt::Write; + write!(dims, "{}", v).unwrap(); + } + dims.push('}'); + shape_dims.push(dims); + rid.push(i as i32); + tx.push((i as u32, e.tx)); + } + match pgres + .pgc + .query(&pgres.st, &[&backend, &channel, &scalar_type, &shape_dims, &rid]) + .await + .map_err(|e| { + error!("{e}"); + Error::from(e.to_string()) + }) { + Ok(rows) => { + if pgc_tx.send(pgres).await.is_err() { + Err(Error::with_msg_no_trace("can not hand pgres back")) + } else { + let mut series_ids = Vec::new(); + let mut txs = Vec::new(); + let mut it1 = rows.into_iter(); + let mut e1 = it1.next(); + for (qrid, tx) in tx { + if let Some(row) = &e1 { + let rid: i32 = row.get(1); + if rid as u32 == qrid { + let series: i64 = row.get(0); + let series = SeriesId::new(series as _); + series_ids.push(Existence::Existing(series)); + txs.push(tx); + } + e1 = it1.next(); + } + } + let result = ChannelInfoResult { + series: series_ids, + tx: txs, + }; + Ok(result) + } + } + Err(e) => { + error!("error in pg query {e}"); + tokio::time::sleep(Duration::from_millis(2000)).await; + Err(e) + } + } + } else { + error!("can not get pgc"); + Err(Error::with_msg_no_trace("no more pgres")) + } + } + }) + .buffer_unordered(npg); + while let Some(item) = stream.next().await { + match item { + Ok(res) => { + for (sid, tx) in res.series.into_iter().zip(res.tx) { + match tx.send(Ok(sid)).await { + Ok(_) => {} + Err(_) => { + // TODO count cases, but no log. Client may no longer be interested in this result. + } + } + } + } + Err(e) => { + error!("{e}"); + tokio::time::sleep(Duration::from_millis(1000)).await; + } + } + } + Ok(()) +} + +pub async fn start_task(db: &Database) -> Result, Error> { + let sql = concat!( + "with q1 as (select * from unnest($1::text[], $2::text[], $3::int[], $4::text[], $5::int[])", + " as inp (backend, channel, scalar_type, shape_dims, rid))", + " select t.series, q1.rid from series_by_channel t", + " join q1 on t.facility = q1.backend and t.channel = q1.channel", + " and t.scalar_type = q1.scalar_type and t.shape_dims = q1.shape_dims::int[]", + " and t.agg_kind = 0", + " order by q1.rid", + ); + let inp_cap = 128; + let batch_out_cap = 4; + let pgcn = 4; + let timeout = Duration::from_millis(200); + let (pgc_tx, pgc_rx) = prepare_pgcs(sql, pgcn, db).await?; + let (query_tx, query_rx) = async_channel::bounded(inp_cap); + let (batch_rx, _batch_jh) = batcher::batch(inp_cap, timeout, batch_out_cap, query_rx); + let _queries_jh: JoinHandle<_> = tokio::task::spawn(run_queries(pgcn, batch_rx, pgc_rx, pgc_tx)); + Ok(query_tx) +} diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 56b7bbb..4a24af9 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -7,22 +7,16 @@ pub mod store; use self::store::DataStore; use crate::ca::connset::CaConnSet; -use crate::conf::CaIngestOpts; use crate::errconv::ErrConv; -use crate::insertworker::spawn_scylla_insert_workers; -use crate::metrics::metrics_agg_task; use crate::metrics::ExtraInsertsConf; use crate::rt::TokMx; use crate::store::CommonInsertItemQueue; use err::Error; -use futures_util::stream::FuturesUnordered; use futures_util::Future; -use futures_util::{FutureExt, StreamExt}; +use futures_util::FutureExt; use log::*; use netpod::Database; -use stats::CaConnStats; use stats::CaConnStatsAgg; -use std::collections::BTreeMap; use std::net::SocketAddrV4; use std::pin::Pin; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; @@ -233,200 +227,3 @@ fn handler_sigaction(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const lib crate::ca::SIGINT.store(1, Ordering::Release); let _ = crate::linuxhelper::unset_signal_handler(libc::SIGINT); } - -pub async fn ca_connect(opts: CaIngestOpts, channels: &Vec) -> Result<(), Error> { - crate::linuxhelper::set_signal_handler(libc::SIGINT, handler_sigaction)?; - let extra_inserts_conf = TokMx::new(ExtraInsertsConf { copies: Vec::new() }); - let insert_ivl_min = Arc::new(AtomicU64::new(8800)); - let scyconf = opts.scylla().clone(); - let pgconf = opts.postgresql().clone(); - let d = &pgconf; - let (pg_client, pg_conn) = tokio_postgres::connect( - &format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name), - tokio_postgres::tls::NoTls, - ) - .await - .err_conv()?; - // TODO allow clean shutdown on ctrl-c and join the pg_conn in the end: - tokio::spawn(pg_conn); - let pg_client = Arc::new(pg_client); - - // TODO use a new type: - let local_stats = Arc::new(CaConnStats::new()); - - info!("fetch phonebook begin"); - // Fetch all addresses for all channels. - let rows = pg_client - .query( - "select distinct on (facility, channel) channel, addr from ioc_by_channel_log where channel is not null and addr is not null order by facility, channel, tsmod desc", - &[], - ) - .await - .err_conv()?; - let mut phonebook = BTreeMap::new(); - for row in rows { - let channel: String = row.get(0); - let addr: String = row.get(1); - let addr: SocketAddrV4 = addr - .parse() - .map_err(|_| Error::with_msg_no_trace(format!("can not parse address {addr}")))?; - phonebook.insert(channel, addr); - } - info!("fetch phonebook done"); - - let mut channels_by_host = BTreeMap::new(); - - let data_store = Arc::new(DataStore::new(&scyconf, pg_client.clone()).await?); - let insert_item_queue = CommonInsertItemQueue::new(opts.insert_item_queue_cap()); - let insert_item_queue = Arc::new(insert_item_queue); - - let ingest_commons = IngestCommons { - pgconf: Arc::new(pgconf.clone()), - backend: opts.backend().into(), - local_epics_hostname: opts.local_epics_hostname().clone(), - insert_item_queue: insert_item_queue.clone(), - data_store: data_store.clone(), - insert_ivl_min: insert_ivl_min.clone(), - extra_inserts_conf, - store_workers_rate: AtomicU64::new(opts.store_workers_rate()), - insert_frac: AtomicU64::new(opts.insert_frac()), - ca_conn_set: CaConnSet::new(), - }; - let ingest_commons = Arc::new(ingest_commons); - - tokio::spawn({ - let rx = ingest_commons.ca_conn_set.conn_item_rx(); - async move { while let Ok(_item) = rx.recv().await {} } - }); - - // TODO use a new stats type: - let store_stats = Arc::new(CaConnStats::new()); - let ttls = crate::insertworker::Ttls { - index: opts.ttl_index(), - d0: opts.ttl_d0(), - d1: opts.ttl_d1(), - }; - let jh_insert_workers = spawn_scylla_insert_workers( - opts.scylla().clone(), - opts.insert_scylla_sessions(), - opts.insert_worker_count(), - insert_item_queue.clone(), - ingest_commons.clone(), - pg_client.clone(), - store_stats.clone(), - opts.use_rate_limit_queue(), - ttls, - ) - .await?; - - if true { - tokio::spawn(crate::metrics::start_metrics_service( - opts.api_bind().clone(), - ingest_commons.clone(), - )); - } - - let metrics_agg_fut = metrics_agg_task(ingest_commons.clone(), local_stats.clone(), store_stats.clone()); - let metrics_agg_jh = tokio::spawn(metrics_agg_fut); - - let mut chns_todo = &channels[..]; - let mut ix = 0; - for ch in chns_todo { - if SIGINT.load(Ordering::Acquire) != 0 { - break; - } - let ch = ch.to_string(); - chns_todo = &chns_todo[1..]; - if let Some(addr) = phonebook.get(&ch) { - if !channels_by_host.contains_key(&addr) { - channels_by_host.insert(addr, vec![ch.to_string()]); - } else { - channels_by_host.get_mut(&addr).unwrap().push(ch.to_string()); - } - ingest_commons - .ca_conn_set - .add_channel_to_addr( - opts.backend().into(), - *addr, - ch.clone(), - &ingest_commons.insert_item_queue, - &ingest_commons.data_store, - opts.insert_queue_max(), - opts.array_truncate(), - opts.local_epics_hostname(), - ) - .await?; - } - ix += 1; - if ix % 1000 == 0 { - info!("{} of {} {}", ix, channels.len(), ch); - } - } - info!("channels_by_host len {}", channels_by_host.len()); - - loop { - if SIGINT.load(Ordering::Acquire) != 0 { - if false { - let receiver = insert_item_queue.receiver(); - let sc = receiver.sender_count(); - let rc = receiver.receiver_count(); - info!("item queue senders {} receivers {}", sc, rc); - } - error!("TODO sending stop commands"); - //ingest_commons.ca_conn_set.send_stop().await?; - break; - } - tokio::time::sleep(Duration::from_millis(400)).await; - } - ingest_commons.ca_conn_set.wait_stopped().await?; - info!("all connections done."); - - insert_item_queue.close(); - - drop(ingest_commons); - metrics_agg_jh.abort(); - drop(metrics_agg_jh); - - if false { - let sender = insert_item_queue.sender_raw(); - sender.close(); - let receiver = insert_item_queue.receiver(); - receiver.close(); - } - if true { - let receiver = insert_item_queue.receiver(); - let sc = receiver.sender_count(); - let rc = receiver.receiver_count(); - info!("item queue A senders {} receivers {}", sc, rc); - } - let receiver = insert_item_queue.receiver(); - drop(insert_item_queue); - if true { - let sc = receiver.sender_count(); - let rc = receiver.receiver_count(); - info!("item queue B senders {} receivers {}", sc, rc); - } - receiver.close(); - - let mut futs = FuturesUnordered::from_iter(jh_insert_workers); - loop { - futures_util::select!( - x = futs.next() => match x { - Some(Ok(_)) => {} - Some(Err(e)) => { - error!("error on shutdown: {e:?}"); - } - None => break, - }, - _ = tokio::time::sleep(Duration::from_millis(1000)).fuse() => { - if true { - let sc = receiver.sender_count(); - let rc = receiver.receiver_count(); - info!("waiting inserters {} items {} senders {} receivers {}", futs.len(), receiver.len(), sc, rc); - } - } - ); - } - info!("all insert workers done."); - Ok(()) -} diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 3b0939b..b9f54e0 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -5,11 +5,12 @@ use super::proto::CaMsgTy; use super::proto::CaProto; use super::store::DataStore; use super::ExtraInsertsConf; +use crate::batchquery::series_by_channel::ChannelInfoQuery; use crate::bsread::ChannelDescDecoded; use crate::ca::proto::CreateChan; use crate::ca::proto::EventAdd; -use crate::ca::store::ChannelRegistry; -use crate::series::{Existence, SeriesId}; +use crate::series::Existence; +use crate::series::SeriesId; use crate::store::ChannelInfoItem; use crate::store::ChannelStatus; use crate::store::ChannelStatusItem; @@ -17,9 +18,13 @@ use crate::store::CommonInsertItemQueueSender; use crate::store::ConnectionStatus; use crate::store::ConnectionStatusItem; use crate::store::{InsertItem, IvlItem, MuteItem, QueryItem}; +use async_channel::Sender; use err::Error; -use futures_util::stream::FuturesOrdered; -use futures_util::{Future, FutureExt, Stream, StreamExt, TryFutureExt}; +use futures_util::stream::FuturesUnordered; +use futures_util::Future; +use futures_util::FutureExt; +use futures_util::Stream; +use futures_util::StreamExt; use log::*; use netpod::timeunits::*; use netpod::ScalarType; @@ -30,7 +35,6 @@ use serde::Serialize; use stats::CaConnStats; use stats::IntervalEma; use std::collections::BTreeMap; -use std::collections::HashMap; use std::collections::VecDeque; use std::net::SocketAddrV4; use std::ops::ControlFlow; @@ -378,9 +382,20 @@ impl ChannelSetOps { } } +struct ChannelOpsResources<'a> { + channel_set_ops: &'a StdMutex>, + channels: &'a mut BTreeMap, + cid_by_name: &'a mut BTreeMap, + name_by_cid: &'a mut BTreeMap, + cid_store: &'a mut CidStore, + init_state_count: &'a mut u64, + channel_set_ops_flag: &'a AtomicUsize, +} + pub struct CaConn { state: CaConnState, shutdown: bool, + ticker: Pin>, proto: Option, cid_store: CidStore, subid_store: SubidStore, @@ -389,12 +404,9 @@ pub struct CaConn { cid_by_name: BTreeMap, cid_by_subid: BTreeMap, name_by_cid: BTreeMap, - data_store: Arc, insert_item_queue: VecDeque, insert_item_sender: CommonInsertItemQueueSender, insert_item_send_fut: Option>, - fut_get_series: - FuturesOrdered), Error>> + Send>>>, backend: String, remote_addr_dbg: SocketAddrV4, local_epics_hostname: String, @@ -412,6 +424,11 @@ pub struct CaConn { ioc_ping_start: Option, cmd_res_queue: VecDeque, channel_set_ops: Arc, + channel_info_query_tx: Sender, + series_lookup_schedule: BTreeMap, + series_lookup_futs: FuturesUnordered< + Pin), Error>> + Send>>, + >, } impl CaConn { @@ -419,7 +436,8 @@ impl CaConn { backend: String, remote_addr_dbg: SocketAddrV4, local_epics_hostname: String, - data_store: Arc, + channel_info_query_tx: Sender, + _data_store: Arc, insert_item_sender: CommonInsertItemQueueSender, array_truncate: usize, insert_queue_max: usize, @@ -428,6 +446,7 @@ impl CaConn { Self { state: CaConnState::Unconnected, shutdown: false, + ticker: Box::pin(tokio::time::sleep(Duration::from_millis(500))), proto: None, cid_store: CidStore::new(), subid_store: SubidStore::new(), @@ -436,11 +455,9 @@ impl CaConn { cid_by_name: BTreeMap::new(), cid_by_subid: BTreeMap::new(), name_by_cid: BTreeMap::new(), - data_store, insert_item_queue: VecDeque::new(), insert_item_sender, insert_item_send_fut: None, - fut_get_series: FuturesOrdered::new(), backend, remote_addr_dbg, local_epics_hostname, @@ -461,6 +478,9 @@ impl CaConn { ops: StdMutex::new(BTreeMap::new()), flag: AtomicUsize::new(0), }), + channel_info_query_tx, + series_lookup_schedule: BTreeMap::new(), + series_lookup_futs: FuturesUnordered::new(), } } @@ -474,7 +494,7 @@ impl CaConn { fn trigger_shutdown(&mut self) { self.shutdown = true; - for (k, v) in self.channels.iter_mut() { + for (_k, v) in self.channels.iter_mut() { match v { ChannelState::Init => { *v = ChannelState::Ended; @@ -810,8 +830,9 @@ impl CaConn { for (_, st) in &self.channels { match st { ChannelState::Creating { cid, ts_beg } => { - if tsnow.duration_since(*ts_beg) >= Duration::from_millis(10000) { + if false && tsnow.duration_since(*ts_beg) >= Duration::from_millis(10000) { let name = self.name_by_cid.get(cid); + // TODO channel create timed out how to let daemon know? warn!("channel Creating timed out {} {:?}", cid.0, name); } } @@ -945,16 +966,36 @@ impl CaConn { Ok(()) } - fn handle_get_series_futs(&mut self, cx: &mut Context) -> Poll> { + fn emit_series_lookup(&mut self, cx: &mut Context) { + let _ = cx; + loop { + break if let Some(mut entry) = self.series_lookup_schedule.first_entry() { + let dummy = entry.get().dummy(); + let query = std::mem::replace(entry.get_mut(), dummy); + match self.channel_info_query_tx.try_send(query) { + Ok(()) => { + entry.remove(); + continue; + } + Err(e) => { + *entry.get_mut() = e.into_inner(); + } + } + } else { + () + }; + } + } + + fn poll_channel_info_results(&mut self, cx: &mut Context) { use Poll::*; - while self.fut_get_series.len() > 0 { - match self.fut_get_series.poll_next_unpin(cx) { + loop { + break match self.series_lookup_futs.poll_next_unpin(cx) { Ready(Some(Ok((cid, sid, data_type, data_count, series)))) => { { - let series = series.clone().into_inner(); let item = QueryItem::ChannelStatus(ChannelStatusItem { ts: SystemTime::now(), - series: series, + series: series.clone().into_inner(), status: ChannelStatus::Opened, }); self.insert_item_queue.push_back(item); @@ -962,16 +1003,17 @@ impl CaConn { match self.channel_to_evented(cid, sid, data_type, data_count, series, cx) { Ok(_) => {} Err(e) => { - return Ready(Err(e)); + error!("poll_channel_info_results {e}"); } } } - Ready(Some(Err(e))) => return Ready(Err(e)), - Ready(None) => return Ready(Err(Error::with_msg_no_trace("series lookup stream should never end"))), - Pending => break, - } + Ready(Some(Err(e))) => { + error!("poll_channel_info_results {e}"); + } + Ready(None) => {} + Pending => {} + }; } - return Pending; } fn event_add_insert( @@ -1287,7 +1329,7 @@ impl CaConn { use Poll::*; let mut ts1 = Instant::now(); // TODO unify with Listen state where protocol gets polled as well. - let mut msgs_tmp = vec![]; + let mut msgs_tmp = Vec::new(); self.check_channels_state_init(&mut msgs_tmp)?; let ts2 = Instant::now(); self.stats @@ -1354,23 +1396,43 @@ impl CaConn { info_store_msp_last: info_store_msp_from_time(SystemTime::now()), }); // TODO handle error in different way. Should most likely not abort. - let cd = ChannelDescDecoded { - name: name.to_string(), - scalar_type, - shape, + let _cd = ChannelDescDecoded { + name: name.clone(), + scalar_type: scalar_type.clone(), + shape: shape.clone(), agg_kind: netpod::AggKind::Plain, // TODO these play no role in series id: byte_order: netpod::ByteOrder::Little, compression: None, }; - let z = unsafe { - &*(&self.data_store.chan_reg as &ChannelRegistry as *const ChannelRegistry) + let (tx, rx) = async_channel::bounded(1); + let query = ChannelInfoQuery { + backend: self.backend.clone(), + channel: name.clone(), + scalar_type: scalar_type.to_scylla_i32(), + shape_dims: shape.to_scylla_vec(), + tx, }; - let fut = z - .get_series_id(cd, self.backend.clone()) - .map_ok(move |series| (cid, k.sid, k.data_type, k.data_count, series)); - // TODO throttle execution rate: - self.fut_get_series.push_back(Box::pin(fut) as _); + if !self.series_lookup_schedule.contains_key(&cid) { + self.series_lookup_schedule.insert(cid, query); + let fut = async move { + match rx.recv().await { + Ok(item) => match item { + Ok(item) => Ok((cid, sid, k.data_type, k.data_count, item)), + Err(e) => Err(e), + }, + Err(e) => { + // TODO count only + error!("can not receive series lookup result {e}"); + Err(Error::with_msg_no_trace("can not receive lookup result")) + } + } + }; + self.series_lookup_futs.push(Box::pin(fut)); + } else { + // TODO count only + warn!("series lookup for {name} already in progress"); + } do_wake_again = true; } CaMsgTy::EventAddRes(k) => { @@ -1404,6 +1466,9 @@ impl CaConn { self.ioc_ping_last = Instant::now(); self.ioc_ping_start = None; } + CaMsgTy::CreateChanFail(_) => { + // TODO handle CreateChanFail + } _ => { warn!("Received unexpected protocol message {:?}", k); } @@ -1440,7 +1505,6 @@ impl CaConn { }; if do_wake_again { // TODO remove the need for this: - trace!("do_wake_again"); cx.waker().wake_by_ref(); } res @@ -1545,14 +1609,6 @@ impl CaConn { Pending => Some(Pending), }, CaConnState::PeerReady => { - { - // TODO can I move this block somewhere else? - match self.handle_get_series_futs(cx) { - Ready(Ok(_)) => (), - Ready(Err(e)) => return Some(Ready(Err(e))), - Pending => (), - } - } let res = self.handle_peer_ready(cx); match res { Ready(Some(Ok(()))) => None, @@ -1588,7 +1644,7 @@ impl CaConn { } } - fn apply_3(res: ChannelOpsResources) { + fn apply_channel_ops_with_res(res: ChannelOpsResources) { let mut g = res.channel_set_ops.lock().unwrap(); let map = std::mem::replace(&mut *g, BTreeMap::new()); for (ch, op) in map { @@ -1609,7 +1665,7 @@ impl CaConn { res.channel_set_ops_flag.store(0, atomic::Ordering::Release); } - fn apply_2(&mut self) { + fn apply_channel_ops(&mut self) { let res = ChannelOpsResources { channel_set_ops: &self.channel_set_ops.ops, channels: &mut self.channels, @@ -1619,20 +1675,10 @@ impl CaConn { init_state_count: &mut self.init_state_count, channel_set_ops_flag: &self.channel_set_ops.flag, }; - Self::apply_3(res) + Self::apply_channel_ops_with_res(res) } } -struct ChannelOpsResources<'a> { - channel_set_ops: &'a StdMutex>, - channels: &'a mut BTreeMap, - cid_by_name: &'a mut BTreeMap, - name_by_cid: &'a mut BTreeMap, - cid_store: &'a mut CidStore, - init_state_count: &'a mut u64, - channel_set_ops_flag: &'a AtomicUsize, -} - impl Stream for CaConn { type Item = Result; @@ -1641,7 +1687,16 @@ impl Stream for CaConn { let poll_ts1 = Instant::now(); self.stats.caconn_poll_count_inc(); if self.channel_set_ops.flag.load(atomic::Ordering::Acquire) > 0 { - Self::apply_2(&mut self); + self.apply_channel_ops(); + } + self.emit_series_lookup(cx); + self.poll_channel_info_results(cx); + match self.ticker.poll_unpin(cx) { + Ready(()) => { + self.ticker = Box::pin(tokio::time::sleep(Duration::from_millis(500))); + cx.waker().wake_by_ref(); + } + Pending => {} } let ret = if let Some(item) = self.cmd_res_queue.pop_front() { Ready(Some(Ok(CaConnEvent { diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 0e98cd8..b86b072 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -1,9 +1,9 @@ use super::conn::CaConnEvent; -use super::conn::ChannelSetOp; use super::conn::ChannelSetOps; use super::conn::ConnCommand; use super::store::DataStore; use super::SlowWarnable; +use crate::batchquery::series_by_channel::ChannelInfoQuery; use crate::ca::conn::CaConn; use crate::ca::conn::CaConnEventValue; use crate::errconv::ErrConv; @@ -55,15 +55,17 @@ pub struct CaConnSet { ca_conn_ress: Arc>>, conn_item_tx: Sender<(SocketAddrV4, CaConnEvent)>, conn_item_rx: Receiver<(SocketAddrV4, CaConnEvent)>, + channel_info_query_tx: Sender, } impl CaConnSet { - pub fn new() -> Self { + pub fn new(channel_info_query_tx: Sender) -> Self { let (conn_item_tx, conn_item_rx) = async_channel::bounded(10000); Self { ca_conn_ress: Arc::new(TokMx::new(BTreeMap::new())), conn_item_tx, conn_item_rx, + channel_info_query_tx, } } @@ -92,6 +94,7 @@ impl CaConnSet { backend.clone(), addr, local_epics_hostname, + self.channel_info_query_tx.clone(), data_store.clone(), insert_item_queue_sender, array_truncate, diff --git a/netfetch/src/ca/store.rs b/netfetch/src/ca/store.rs index 1d493b1..2d7973f 100644 --- a/netfetch/src/ca/store.rs +++ b/netfetch/src/ca/store.rs @@ -1,44 +1,9 @@ -use crate::bsread::ChannelDescDecoded; -use crate::series::{Existence, SeriesId}; -use async_channel::{Receiver, Sender}; use err::Error; use netpod::ScyllaConfig; use scylla::prepared_statement::PreparedStatement; use scylla::statement::Consistency; use scylla::Session as ScySession; use std::sync::Arc; -use tokio_postgres::Client as PgClient; - -#[allow(unused)] -pub struct RegisterJob { - desc: ChannelDescDecoded, -} - -impl RegisterJob { - pub fn new(desc: ChannelDescDecoded) -> Self { - Self { desc } - } -} - -#[allow(unused)] -pub struct RegisterChannel { - tx: Sender, - rx: Receiver, -} - -pub struct ChannelRegistry { - pg_client: Arc, -} - -impl ChannelRegistry { - pub fn new(pg_client: Arc) -> Self { - Self { pg_client } - } - - pub async fn get_series_id(&self, cd: ChannelDescDecoded, backend: String) -> Result, Error> { - crate::series::get_series_id(&self.pg_client, &cd, backend).await - } -} pub struct DataStore { pub scy: Arc, @@ -61,11 +26,10 @@ pub struct DataStore { pub qu_insert_channel_status: Arc, pub qu_insert_channel_status_by_ts_msp: Arc, pub qu_insert_channel_ping: Arc, - pub chan_reg: Arc, } impl DataStore { - pub async fn new(scyconf: &ScyllaConfig, pg_client: Arc) -> Result { + pub async fn new(scyconf: &ScyllaConfig) -> Result { let scy = scylla::SessionBuilder::new() .known_nodes(&scyconf.hosts) .default_consistency(Consistency::LocalOne) @@ -182,7 +146,6 @@ impl DataStore { .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_channel_ping = Arc::new(q); let ret = Self { - chan_reg: Arc::new(ChannelRegistry::new(pg_client)), scy, qu_insert_ts_msp, qu_insert_series_by_ts_msp, diff --git a/netfetch/src/dbpg.rs b/netfetch/src/dbpg.rs new file mode 100644 index 0000000..c666d64 --- /dev/null +++ b/netfetch/src/dbpg.rs @@ -0,0 +1,16 @@ +use crate::errconv::ErrConv; +use err::Error; +use netpod::Database; +use tokio_postgres::Client as PgClient; + +pub async fn make_pg_client(d: &Database) -> Result { + let (client, pg_conn) = tokio_postgres::connect( + &format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name), + tokio_postgres::tls::NoTls, + ) + .await + .err_conv()?; + // TODO allow clean shutdown on ctrl-c and join the pg_conn in the end: + tokio::spawn(pg_conn); + Ok(client) +} diff --git a/netfetch/src/insertworker.rs b/netfetch/src/insertworker.rs index 74a0acd..054b6be 100644 --- a/netfetch/src/insertworker.rs +++ b/netfetch/src/insertworker.rs @@ -56,7 +56,7 @@ pub async fn spawn_scylla_insert_workers( insert_worker_count: usize, insert_item_queue: Arc, ingest_commons: Arc, - pg_client: Arc, + _pg_client: Arc, store_stats: Arc, use_rate_limit_queue: bool, ttls: Ttls, @@ -119,7 +119,7 @@ pub async fn spawn_scylla_insert_workers( let mut jhs = Vec::new(); let mut data_stores = Vec::new(); for _ in 0..insert_scylla_sessions { - let data_store = Arc::new(DataStore::new(&scyconf, pg_client.clone()).await?); + let data_store = Arc::new(DataStore::new(&scyconf).await?); data_stores.push(data_store); } for i1 in 0..insert_worker_count { diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 4300aab..77e94f5 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -1,12 +1,14 @@ -use crate::ca::conn::ConnCommand; use crate::ca::IngestCommons; use crate::ca::METRICS; use axum::extract::Query; use err::Error; use http::Request; use log::*; -use serde::{Deserialize, Serialize}; -use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff}; +use serde::Deserialize; +use serde::Serialize; +use stats::CaConnStats; +use stats::CaConnStatsAgg; +use stats::CaConnStatsAggDiff; use std::collections::HashMap; use std::net::SocketAddrV4; use std::sync::atomic::Ordering; @@ -26,7 +28,7 @@ impl ExtraInsertsConf { async fn find_channel( params: HashMap, - ingest_commons: Arc, + dcom: Arc, ) -> axum::Json)>> { let pattern = params.get("pattern").map_or(String::new(), |x| x.clone()).to_string(); // TODO ask Daemon for that information. @@ -35,7 +37,7 @@ async fn find_channel( axum::Json(res) } -async fn channel_add_inner(params: HashMap, ingest_commons: Arc) -> Result<(), Error> { +async fn channel_add_inner(params: HashMap, dcom: Arc) -> Result<(), Error> { if let (Some(backend), Some(name)) = (params.get("backend"), params.get("name")) { error!("TODO channel_add_inner"); Err(Error::with_msg_no_trace(format!("TODO channel_add_inner"))) @@ -44,18 +46,15 @@ async fn channel_add_inner(params: HashMap, ingest_commons: Arc< } } -async fn channel_add(params: HashMap, ingest_commons: Arc) -> axum::Json { - let ret = match channel_add_inner(params, ingest_commons).await { +async fn channel_add(params: HashMap, dcom: Arc) -> axum::Json { + let ret = match channel_add_inner(params, dcom).await { Ok(_) => true, Err(_) => false, }; axum::Json(ret) } -async fn channel_remove( - params: HashMap, - ingest_commons: Arc, -) -> axum::Json { +async fn channel_remove(params: HashMap, dcom: Arc) -> axum::Json { use axum::Json; use serde_json::Value; let addr = if let Some(x) = params.get("addr") { @@ -81,7 +80,7 @@ async fn channel_remove( Json(Value::Bool(false)) } -async fn channel_state(params: HashMap, ingest_commons: Arc) -> axum::Json { +async fn channel_state(params: HashMap, dcom: Arc) -> axum::Json { let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string(); error!("TODO channel_state"); axum::Json(false) @@ -89,14 +88,14 @@ async fn channel_state(params: HashMap, ingest_commons: Arc, - ingest_commons: Arc, + dcom: Arc, ) -> axum::Json> { let limit = params.get("limit").map(|x| x.parse()).unwrap_or(Ok(40)).unwrap_or(40); error!("TODO channel_state"); axum::Json(Vec::new()) } -async fn extra_inserts_conf_set(v: ExtraInsertsConf, ingest_commons: Arc) -> axum::Json { +async fn extra_inserts_conf_set(v: ExtraInsertsConf, dcom: Arc) -> axum::Json { // TODO ingest_commons is the authorative value. Should have common function outside of this metrics which // can update everything to a given value. error!("TODO extra_inserts_conf_set"); @@ -111,12 +110,22 @@ struct DummyQuery { age: usize, } -pub async fn start_metrics_service(bind_to: String, ingest_commons: Arc) { +pub struct DaemonComm {} + +impl DaemonComm { + pub fn dummy() -> Self { + Self {} + } +} + +fn make_routes(dcom: Arc) -> axum::Router { use axum::extract; - use axum::http::StatusCode; - use axum::routing::{get, put}; + use axum::routing::get; + use axum::routing::put; use axum::Router; - let app = Router::new() + use http::StatusCode; + + Router::new() .fallback(|req: Request| async move { info!("Fallback for {} {}", req.method(), req.uri()); StatusCode::NOT_FOUND @@ -149,89 +158,83 @@ pub async fn start_metrics_service(bind_to: String, ingest_commons: Arc>| find_channel(params, ingest_commons) + let dcom = dcom.clone(); + |Query(params): Query>| find_channel(params, dcom) }), ) .route( "/daqingest/channel/state", get({ - let ingest_commons = ingest_commons.clone(); - |Query(params): Query>| channel_state(params, ingest_commons) + let dcom = dcom.clone(); + |Query(params): Query>| channel_state(params, dcom) }), ) .route( "/daqingest/channel/states", get({ - let ingest_commons = ingest_commons.clone(); - |Query(params): Query>| channel_states(params, ingest_commons) + let dcom = dcom.clone(); + |Query(params): Query>| channel_states(params, dcom) }), ) .route( "/daqingest/channel/add", get({ - let ingest_commons = ingest_commons.clone(); - |Query(params): Query>| channel_add(params, ingest_commons) + let dcom = dcom.clone(); + |Query(params): Query>| channel_add(params, dcom) }), ) .route( "/daqingest/channel/remove", get({ - let ingest_commons = ingest_commons.clone(); - |Query(params): Query>| channel_remove(params, ingest_commons) + let dcom = dcom.clone(); + |Query(params): Query>| channel_remove(params, dcom) }), ) .route( "/store_workers_rate", get({ - let c = ingest_commons.clone(); - || async move { axum::Json(c.store_workers_rate.load(Ordering::Acquire)) } + let dcom = dcom.clone(); + || async move { axum::Json(123) } }) .put({ - let c = ingest_commons.clone(); - |v: extract::Json| async move { - c.store_workers_rate.store(v.0, Ordering::Release); - } + let dcom = dcom.clone(); + |v: extract::Json| async move {} }), ) .route( "/insert_frac", get({ - let c = ingest_commons.clone(); - || async move { axum::Json(c.insert_frac.load(Ordering::Acquire)) } + let dcom = dcom.clone(); + || async move { axum::Json(123) } }) .put({ - let c = ingest_commons.clone(); - |v: extract::Json| async move { - c.insert_frac.store(v.0, Ordering::Release); - } + let dcom = dcom.clone(); + |v: extract::Json| async move {} }), ) .route( "/extra_inserts_conf", get({ - let c = ingest_commons.clone(); - || async move { - let res = c.extra_inserts_conf.lock().await; - axum::Json(serde_json::to_value(&*res).unwrap()) - } + let dcom = dcom.clone(); + || async move { axum::Json(serde_json::to_value(&"TODO").unwrap()) } }) .put({ - let ingest_commons = ingest_commons.clone(); - |v: extract::Json| extra_inserts_conf_set(v.0, ingest_commons) + let dcom = dcom.clone(); + |v: extract::Json| extra_inserts_conf_set(v.0, dcom) }), ) .route( "/insert_ivl_min", put({ - let insert_ivl_min = ingest_commons.insert_ivl_min.clone(); - |v: extract::Json| async move { - insert_ivl_min.store(v.0, Ordering::Release); - } + let dcom = dcom.clone(); + |v: extract::Json| async move {} }), - ); + ) +} + +pub async fn start_metrics_service(bind_to: String, dcom: Arc) { axum::Server::bind(&bind_to.parse().unwrap()) - .serve(app.into_make_service()) + .serve(make_routes(dcom).into_make_service()) .await .unwrap() } diff --git a/netfetch/src/netfetch.rs b/netfetch/src/netfetch.rs index 70eb5ec..dd6c60c 100644 --- a/netfetch/src/netfetch.rs +++ b/netfetch/src/netfetch.rs @@ -1,7 +1,10 @@ +pub mod batcher; +pub mod batchquery; pub mod bsread; pub mod ca; pub mod channelwriter; pub mod conf; +pub mod dbpg; pub mod errconv; pub mod insertworker; pub mod linuxhelper; diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 839bcc8..723b1b6 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -264,3 +264,9 @@ stats_proc::stats_struct!(( agg(name(CaConnStatsAgg), parent(CaConnStats)), diff(name(CaConnStatsAggDiff), input(CaConnStatsAgg)), )); + +stats_proc::stats_struct!(( + stats_struct(name(DaemonStats), counters(main_lookupaddr_ok)), + agg(name(DaemonStatsAgg), parent(DaemonStats)), + diff(name(DaemonStatsAggDiff), input(DaemonStatsAgg)), +));