diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 78602fd..0630c6f 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -1,3 +1,6 @@ +pub mod inserthook; +pub mod types; + use async_channel::Receiver; use async_channel::Sender; use async_channel::WeakReceiver; @@ -51,6 +54,7 @@ use tokio_postgres::Client as PgClient; use tokio_postgres::Row as PgRow; use tracing::info_span; use tracing::Instrument; +use types::*; const SEARCH_BATCH_MAX: usize = 256; const CURRENT_SEARCH_PENDING_MAX: usize = SEARCH_BATCH_MAX * 4; @@ -86,7 +90,7 @@ static SEARCH_ANS_COUNT: AtomicUsize = AtomicUsize::new(0); #[allow(unused)] macro_rules! debug_batch { - (D$($arg:tt)*) => (); + // (D$($arg:tt)*) => (); ($($arg:tt)*) => (if false { info!($($arg)*); }); @@ -94,27 +98,12 @@ macro_rules! debug_batch { #[allow(unused)] macro_rules! trace_batch { - (D$($arg:tt)*) => (); + // (D$($arg:tt)*) => (); ($($arg:tt)*) => (if false { trace!($($arg)*); }); } -#[allow(non_snake_case)] -mod serde_Instant { - use serde::Serializer; - use std::time::Instant; - - #[allow(unused)] - pub fn serialize(val: &Instant, ser: S) -> Result - where - S: Serializer, - { - let dur = val.elapsed(); - ser.serialize_u64(dur.as_secs() * 1000 + dur.subsec_millis() as u64) - } -} - #[derive(Clone, Debug, Serialize)] pub enum ConnectionStateValue { Unconnected, @@ -287,7 +276,7 @@ impl Daemon { .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; let datastore = Arc::new(datastore); - let (tx, rx) = async_channel::bounded(32); + let (tx, rx_daemon_ev) = async_channel::bounded(32); let pgcs = { let mut a = Vec::new(); for _ in 0..SEARCH_DB_PIPELINE_LEN { @@ -308,79 +297,12 @@ impl Daemon { .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; let common_insert_item_queue = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap)); - let common_insert_item_queue_2 = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap)); + // let common_insert_item_queue_2 = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap)); let insert_queue_counter = Arc::new(AtomicUsize::new(0)); // Insert queue hook - if true { - tokio::spawn({ - let rx = common_insert_item_queue - .receiver() - .ok_or_else(|| Error::with_msg_no_trace("can not derive receiver for insert queue adapter"))?; - let tx = common_insert_item_queue_2 - .sender() - .ok_or_else(|| Error::with_msg_no_trace("can not derive sender for insert queue adapter"))?; - let insert_queue_counter = insert_queue_counter.clone(); - let common_insert_item_queue_2 = common_insert_item_queue_2.clone(); - async move { - let mut printed_last = Instant::now(); - let mut histo = BTreeMap::new(); - while let Ok(item) = rx.recv().await { - insert_queue_counter.fetch_add(1, atomic::Ordering::AcqRel); - //trace!("insert queue item {item:?}"); - match &item { - QueryItem::Insert(item) => { - let shape_kind = match &item.shape { - netpod::Shape::Scalar => 0 as u32, - netpod::Shape::Wave(_) => 1, - netpod::Shape::Image(_, _) => 2, - }; - histo - .entry(item.series.clone()) - .and_modify(|(c, msp, lsp, pulse, _shape_kind)| { - *c += 1; - *msp = item.ts_msp; - *lsp = item.ts_lsp; - *pulse = item.pulse; - // TODO should check that shape_kind stays the same. - }) - .or_insert((0 as usize, item.ts_msp, item.ts_lsp, item.pulse, shape_kind)); - } - _ => {} - } - match tx.send(item).await { - Ok(_) => {} - Err(e) => { - error!("insert queue hook send {e}"); - break; - } - } - let tsnow = Instant::now(); - if tsnow.duration_since(printed_last) >= PRINT_ACTIVE_INTERVAL { - printed_last = tsnow; - let mut all: Vec<_> = histo - .iter() - .map(|(k, (c, msp, lsp, pulse, shape_kind))| { - (usize::MAX - *c, k.clone(), *msp, *lsp, *pulse, *shape_kind) - }) - .collect(); - all.sort_unstable(); - info!("Active scalar"); - for (c, sid, msp, lsp, pulse, _shape_kind) in all.iter().filter(|x| x.5 == 0).take(6) { - info!("{:10} {:20} {:14} {:20} {:?}", usize::MAX - c, msp, lsp, pulse, sid); - } - info!("Active wave"); - for (c, sid, msp, lsp, pulse, _shape_kind) in all.iter().filter(|x| x.5 == 1).take(6) { - info!("{:10} {:20} {:14} {:20} {:?}", usize::MAX - c, msp, lsp, pulse, sid); - } - histo.clear(); - } - } - info!("insert queue adapter ended"); - common_insert_item_queue_2.drop_sender(); - } - }); - } + let rx = inserthook::active_channel_insert_hook(common_insert_item_queue.receiver().unwrap()); + let common_insert_item_queue_2 = rx; let ingest_commons = IngestCommons { pgconf: Arc::new(opts.pgconf.clone()), @@ -417,8 +339,6 @@ impl Daemon { let insert_worker_count = 1000; let use_rate_limit_queue = false; - let insert_rx_weak = common_insert_item_queue_2.receiver().unwrap().downgrade(); - // TODO use a new stats type: let store_stats = Arc::new(stats::CaConnStats::new()); let ttls = opts.ttls.clone(); @@ -473,7 +393,7 @@ impl Daemon { connection_states: BTreeMap::new(), channel_states: BTreeMap::new(), tx, - rx, + rx: rx_daemon_ev, chan_check_next: None, search_tx, ioc_finder_jh, @@ -492,7 +412,7 @@ impl Daemon { caconn_last_channel_check: Instant::now(), stats: Arc::new(DaemonStats::new()), shutting_down: false, - insert_rx_weak, + insert_rx_weak: common_insert_item_queue_2.downgrade(), channel_info_query_tx, }; Ok(ret) diff --git a/daqingest/src/daemon/inserthook.rs b/daqingest/src/daemon/inserthook.rs new file mode 100644 index 0000000..da2e7d2 --- /dev/null +++ b/daqingest/src/daemon/inserthook.rs @@ -0,0 +1,81 @@ +use crate::daemon::PRINT_ACTIVE_INTERVAL; +use async_channel::Receiver; +use async_channel::Sender; +use log::*; +use netpod::Shape; +use scywr::iteminsertqueue::QueryItem; +use std::collections::BTreeMap; +use std::time::Instant; +use taskrun::tokio; + +pub async fn active_channel_insert_hook_worker(rx: Receiver, tx: Sender) { + // let rx = common_insert_item_queue + // .receiver() + // .ok_or_else(|| Error::with_msg_no_trace("can not derive receiver for insert queue adapter"))?; + // let tx = common_insert_item_queue_2 + // .sender() + // .ok_or_else(|| Error::with_msg_no_trace("can not derive sender for insert queue adapter"))?; + // let insert_queue_counter = insert_queue_counter.clone(); + // let common_insert_item_queue_2 = common_insert_item_queue_2.clone(); + let mut printed_last = Instant::now(); + let mut histo = BTreeMap::new(); + while let Ok(item) = rx.recv().await { + // TODO collect stats + // insert_queue_counter.fetch_add(1, atomic::Ordering::AcqRel); + //trace!("insert queue item {item:?}"); + match &item { + QueryItem::Insert(item) => { + let shape_kind = match &item.shape { + Shape::Scalar => 0 as u32, + Shape::Wave(_) => 1, + Shape::Image(_, _) => 2, + }; + histo + .entry(item.series.clone()) + .and_modify(|(c, msp, lsp, pulse, _shape_kind)| { + *c += 1; + *msp = item.ts_msp; + *lsp = item.ts_lsp; + *pulse = item.pulse; + // TODO should check that shape_kind stays the same. + }) + .or_insert((0 as usize, item.ts_msp, item.ts_lsp, item.pulse, shape_kind)); + } + _ => {} + } + match tx.send(item).await { + Ok(_) => {} + Err(e) => { + error!("insert queue hook send {e}"); + break; + } + } + let tsnow = Instant::now(); + if tsnow.duration_since(printed_last) >= PRINT_ACTIVE_INTERVAL { + printed_last = tsnow; + let mut all: Vec<_> = histo + .iter() + .map(|(k, (c, msp, lsp, pulse, shape_kind))| { + (usize::MAX - *c, k.clone(), *msp, *lsp, *pulse, *shape_kind) + }) + .collect(); + all.sort_unstable(); + info!("Active scalar"); + for (c, sid, msp, lsp, pulse, _shape_kind) in all.iter().filter(|x| x.5 == 0).take(6) { + info!("{:10} {:20} {:14} {:20} {:?}", usize::MAX - c, msp, lsp, pulse, sid); + } + info!("Active wave"); + for (c, sid, msp, lsp, pulse, _shape_kind) in all.iter().filter(|x| x.5 == 1).take(6) { + info!("{:10} {:20} {:14} {:20} {:?}", usize::MAX - c, msp, lsp, pulse, sid); + } + histo.clear(); + } + } + info!("insert queue adapter ended"); +} + +pub fn active_channel_insert_hook(inp: Receiver) -> Receiver { + let (tx, rx) = async_channel::bounded(256); + tokio::spawn(active_channel_insert_hook_worker(inp, tx)); + rx +} diff --git a/daqingest/src/daemon/types.rs b/daqingest/src/daemon/types.rs new file mode 100644 index 0000000..729195c --- /dev/null +++ b/daqingest/src/daemon/types.rs @@ -0,0 +1,11 @@ +use async_channel::Receiver; +use netpod::Database; +use netpod::ScyllaConfig; +use scywr::insertworker::Ttls; +use serde::Serialize; +use series::series::Existence; +use series::ChannelStatusSeriesId; +use series::SeriesId; +use std::net::SocketAddrV4; +use std::time::Instant; +use std::time::SystemTime; diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 8ae77b9..89ff55b 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -1,9 +1,10 @@ use crate::iteminsertqueue::insert_channel_status; use crate::iteminsertqueue::insert_connection_status; use crate::iteminsertqueue::insert_item; -use crate::iteminsertqueue::CommonInsertItemQueue; use crate::iteminsertqueue::QueryItem; use crate::store::DataStore; +use async_channel::Receiver; +use async_channel::Sender; use err::Error; use log::*; use netpod::timeunits::MS; @@ -69,77 +70,80 @@ pub struct InsertWorkerOpts { pub insert_frac: Arc, } +async fn rate_limiter_worker( + rate: Arc, + inp: Receiver, + tx: Sender, + stats: Arc, +) { + let mut ts_forward_last = Instant::now(); + let mut ivl_ema = stats::Ema64::with_k(0.00001); + loop { + let item = if let Ok(x) = inp.recv().await { + x + } else { + break; + }; + let ts_received = Instant::now(); + let allowed_to_drop = match &item { + QueryItem::Insert(_) => true, + _ => false, + }; + let dt_min = { + let rate2 = rate.load(Ordering::Acquire); + Duration::from_nanos(SEC / rate2) + }; + let mut ema2 = ivl_ema.clone(); + { + let dt = ts_received.duration_since(ts_forward_last); + let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64; + ema2.update(dt_ns.min(MS * 100) as f32); + } + let ivl2 = Duration::from_nanos(ema2.ema() as u64); + if allowed_to_drop && ivl2 < dt_min { + //tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await; + stats.store_worker_ratelimit_drop_inc(); + } else { + if tx.send(item).await.is_err() { + break; + } else { + let tsnow = Instant::now(); + let dt = tsnow.duration_since(ts_forward_last); + let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64; + ivl_ema.update(dt_ns.min(MS * 100) as f32); + ts_forward_last = tsnow; + stats.inter_ivl_ema.store(ivl_ema.ema() as u64, Ordering::Release); + } + } + } + info!("rate limiter done"); +} + +fn rate_limiter( + inp: Receiver, + opts: Arc, + stats: Arc, +) -> Receiver { + let (tx, rx) = async_channel::bounded(inp.capacity().unwrap_or(256)); + tokio::spawn(rate_limiter_worker(opts.store_workers_rate.clone(), inp, tx, stats)); + rx +} + pub async fn spawn_scylla_insert_workers( scyconf: ScyllaConfig, insert_scylla_sessions: usize, insert_worker_count: usize, - insert_item_queue: Arc, + item_inp: Receiver, insert_worker_opts: Arc, store_stats: Arc, use_rate_limit_queue: bool, ttls: Ttls, ) -> Result>, Error> { - let (q2_tx, q2_rx) = async_channel::bounded( - insert_item_queue - .receiver() - .map_or(20000, |x| x.capacity().unwrap_or(20000)), - ); - { - let insert_worker_opts = insert_worker_opts.clone(); - let stats = store_stats.clone(); - let recv = insert_item_queue - .receiver() - .ok_or_else(|| Error::with_msg_no_trace("can not derive insert queue receiver"))?; - let store_stats = store_stats.clone(); - let fut = async move { - if !use_rate_limit_queue { - return; - } - let mut ts_forward_last = Instant::now(); - let mut ivl_ema = stats::Ema64::with_k(0.00001); - loop { - let item = if let Ok(x) = recv.recv().await { - x - } else { - break; - }; - let ts_received = Instant::now(); - let allowed_to_drop = match &item { - QueryItem::Insert(_) => true, - _ => false, - }; - let dt_min = { - let rate = insert_worker_opts.store_workers_rate.load(Ordering::Acquire); - Duration::from_nanos(SEC / rate) - }; - let mut ema2 = ivl_ema.clone(); - { - let dt = ts_received.duration_since(ts_forward_last); - let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64; - ema2.update(dt_ns.min(MS * 100) as f32); - } - let ivl2 = Duration::from_nanos(ema2.ema() as u64); - if allowed_to_drop && ivl2 < dt_min { - //tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await; - stats.store_worker_ratelimit_drop_inc(); - } else { - if q2_tx.send(item).await.is_err() { - break; - } else { - let tsnow = Instant::now(); - let dt = tsnow.duration_since(ts_forward_last); - let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64; - ivl_ema.update(dt_ns.min(MS * 100) as f32); - ts_forward_last = tsnow; - store_stats.inter_ivl_ema.store(ivl_ema.ema() as u64, Ordering::Release); - } - } - } - info!("intermediate queue done"); - }; - tokio::spawn(fut); - } - + let item_inp = if use_rate_limit_queue { + rate_limiter(item_inp, insert_worker_opts.clone(), store_stats.clone()) + } else { + item_inp + }; let mut jhs = Vec::new(); let mut data_stores = Vec::new(); for _ in 0..insert_scylla_sessions { @@ -147,15 +151,9 @@ pub async fn spawn_scylla_insert_workers( data_stores.push(data_store); } for worker_ix in 0..insert_worker_count { + let item_inp = item_inp.clone(); let data_store = data_stores[worker_ix * data_stores.len() / insert_worker_count].clone(); let stats = store_stats.clone(); - let recv = if use_rate_limit_queue { - q2_rx.clone() - } else { - insert_item_queue - .receiver() - .ok_or_else(|| Error::with_msg_no_trace("can not derive receiver"))? - }; let insert_worker_opts = insert_worker_opts.clone(); let fut = async move { insert_worker_opts @@ -165,7 +163,7 @@ pub async fn spawn_scylla_insert_workers( let mut backoff = backoff_0.clone(); let mut i1 = 0; loop { - let item = if let Ok(item) = recv.recv().await { + let item = if let Ok(item) = item_inp.recv().await { stats.store_worker_item_recv_inc(); item } else { diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index e4e33bd..ac0e5f5 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -1,6 +1,8 @@ pub use netpod::CONNECTION_STATUS_DIV; use crate::store::DataStore; +use async_channel::Receiver; +use async_channel::Sender; use err::thiserror; use err::ThisError; use log::*; @@ -12,6 +14,7 @@ use scylla::transport::errors::QueryError; use series::SeriesId; use stats::CaConnStats; use std::net::SocketAddrV4; +use std::sync::Mutex; use std::time::Duration; use std::time::SystemTime; @@ -253,15 +256,22 @@ impl CommonInsertItemQueueSender { } pub struct CommonInsertItemQueue { - sender: std::sync::Mutex>>, - recv: async_channel::Receiver, + sender: Mutex>>, + recv: Receiver, } impl CommonInsertItemQueue { pub fn new(cap: usize) -> Self { let (tx, rx) = async_channel::bounded(cap); Self { - sender: std::sync::Mutex::new(Some(tx)), + sender: Mutex::new(Some(tx)), + recv: rx, + } + } + + pub fn from_tx_rx(tx: Sender, rx: Receiver) -> Self { + Self { + sender: Mutex::new(Some(tx)), recv: rx, } } @@ -276,7 +286,7 @@ impl CommonInsertItemQueue { } } - pub fn receiver(&self) -> Option> { + pub fn receiver(&self) -> Option> { let ret = self.recv.clone(); Some(ret) } diff --git a/serde_helper/Cargo.toml b/serde_helper/Cargo.toml new file mode 100644 index 0000000..b6a0cdc --- /dev/null +++ b/serde_helper/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "serde_helper" +version = "0.0.1" +authors = ["Dominik Werder "] +edition = "2021" + +[dependencies] +serde = { version = "1.0", features = ["derive"] } diff --git a/serde_helper/src/lib.rs b/serde_helper/src/lib.rs new file mode 100644 index 0000000..3eb4d34 --- /dev/null +++ b/serde_helper/src/lib.rs @@ -0,0 +1,14 @@ +#[allow(non_snake_case)] +mod serde_Instant { + use serde::Serializer; + use std::time::Instant; + + #[allow(unused)] + pub fn serialize(val: &Instant, ser: S) -> Result + where + S: Serializer, + { + let dur = val.elapsed(); + ser.serialize_u64(dur.as_secs() * 1000 + dur.subsec_millis() as u64) + } +}