From b175516b628215bf1971c9d1451c509e4da4c0f1 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 15 Sep 2023 16:49:32 +0200 Subject: [PATCH] Remove usage of lock --- daqingest/src/daemon/inserthook.rs | 25 ++- dbpg/src/seriesbychannel.rs | 2 +- netfetch/src/ca.rs | 18 -- netfetch/src/ca/conn.rs | 331 +++++++++++++++++++---------- netfetch/src/ca/connset.rs | 60 ++++-- netfetch/src/ca/monitor.rs | 0 netfetch/src/metrics.rs | 56 ++--- netfetch/src/rt.rs | 1 - scywr/src/insertworker.rs | 60 ++++-- scywr/src/iteminsertqueue.rs | 122 ++--------- scywr/src/schema.rs | 73 +++++-- scywr/src/store.rs | 21 ++ stats/src/stats.rs | 14 ++ 13 files changed, 447 insertions(+), 336 deletions(-) delete mode 100644 netfetch/src/ca/monitor.rs diff --git a/daqingest/src/daemon/inserthook.rs b/daqingest/src/daemon/inserthook.rs index da2e7d2..45b8eb2 100644 --- a/daqingest/src/daemon/inserthook.rs +++ b/daqingest/src/daemon/inserthook.rs @@ -2,6 +2,7 @@ use crate::daemon::PRINT_ACTIVE_INTERVAL; use async_channel::Receiver; use async_channel::Sender; use log::*; +use netpod::ScalarType; use netpod::Shape; use scywr::iteminsertqueue::QueryItem; use std::collections::BTreeMap; @@ -30,16 +31,18 @@ pub async fn active_channel_insert_hook_worker(rx: Receiver, tx: Send Shape::Wave(_) => 1, Shape::Image(_, _) => 2, }; - histo - .entry(item.series.clone()) - .and_modify(|(c, msp, lsp, pulse, _shape_kind)| { - *c += 1; - *msp = item.ts_msp; - *lsp = item.ts_lsp; - *pulse = item.pulse; - // TODO should check that shape_kind stays the same. - }) - .or_insert((0 as usize, item.ts_msp, item.ts_lsp, item.pulse, shape_kind)); + if let ScalarType::STRING = item.scalar_type { + histo + .entry(item.series.clone()) + .and_modify(|(c, msp, lsp, pulse, _shape_kind)| { + *c += 1; + *msp = item.ts_msp; + *lsp = item.ts_lsp; + *pulse = item.pulse; + // TODO should check that shape_kind stays the same. + }) + .or_insert((0 as usize, item.ts_msp, item.ts_lsp, item.pulse, shape_kind)); + } } _ => {} } @@ -75,7 +78,7 @@ pub async fn active_channel_insert_hook_worker(rx: Receiver, tx: Send } pub fn active_channel_insert_hook(inp: Receiver) -> Receiver { - let (tx, rx) = async_channel::bounded(256); + let (tx, rx) = async_channel::bounded(inp.capacity().unwrap_or(256)); tokio::spawn(active_channel_insert_hook_worker(inp, tx)); rx } diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index 7ca5177..b5a9477 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -259,7 +259,7 @@ impl Worker { let mut all_good = true; for h in &mut hashers { let mut good = false; - for _ in 0..50 { + for _ in 0..400 { h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes()); let f = h.clone().finalize(); let series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index ba0fcba..f096fef 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -8,20 +8,11 @@ pub mod search; pub mod statemap; use crate::metrics::ExtraInsertsConf; -use crate::rt::TokMx; use futures_util::Future; use futures_util::FutureExt; use log::*; -use netpod::Database; -use scywr::insertworker::InsertWorkerOpts; -use scywr::store::DataStore; -use stats::CaConnStatsAgg; use std::pin::Pin; use std::sync::atomic::AtomicU32; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering; -use std::sync::Arc; -use std::sync::Mutex; use std::task::Poll; use std::time::Duration; use std::time::Instant; @@ -29,10 +20,6 @@ use taskrun::tokio; pub static SIGINT: AtomicU32 = AtomicU32::new(0); -lazy_static::lazy_static! { - pub static ref METRICS: Mutex> = Mutex::new(None); -} - pub trait SlowWarnable { fn slow_warn(self, ms: u64) -> SlowWarn>> where @@ -118,8 +105,3 @@ where } } } - -fn handler_sigaction(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) { - crate::ca::SIGINT.store(1, Ordering::Release); - let _ = ingest_linux::signal::unset_signal_handler(libc::SIGINT); -} diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index e7df175..d7232c1 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -453,6 +453,13 @@ pub struct CaConnOpts { array_truncate: usize, } +impl CaConnOpts { + pub fn with_insert_queue_max(mut self, val: usize) -> Self { + self.insert_queue_max = val; + self + } +} + impl Default for CaConnOpts { fn default() -> Self { Self { @@ -488,6 +495,7 @@ pub struct CaConn { extra_inserts_conf: ExtraInsertsConf, ioc_ping_last: Instant, ioc_ping_start: Option, + storage_insert_sender: SenderPolling, cmd_res_queue: VecDeque, ca_conn_event_out_queue: VecDeque, channel_info_query_queue: VecDeque, @@ -508,6 +516,7 @@ impl CaConn { backend: String, remote_addr_dbg: SocketAddrV4, local_epics_hostname: String, + storage_insert_tx: Sender, channel_info_query_tx: Sender, stats: Arc, ) -> Self { @@ -538,6 +547,7 @@ impl CaConn { extra_inserts_conf: ExtraInsertsConf::new(), ioc_ping_last: Instant::now(), ioc_ping_start: None, + storage_insert_sender: SenderPolling::new(storage_insert_tx), cmd_res_queue: VecDeque::new(), ca_conn_event_out_queue: VecDeque::new(), channel_info_query_queue: VecDeque::new(), @@ -710,47 +720,45 @@ impl CaConn { Ok(()) } - fn handle_conn_command(&mut self, cx: &mut Context) -> Poll>> { + fn handle_conn_command(&mut self, cx: &mut Context) -> Result>, Error> { // TODO if this loops for too long time, yield and make sure we get wake up again. use Poll::*; - loop { - self.stats.caconn_loop3_count.inc(); - break if self.is_shutdown() { - Ready(None) - } else { - match self.conn_command_rx.poll_next_unpin(cx) { - Ready(Some(a)) => { - trace3!("handle_conn_command received a command {}", self.remote_addr_dbg); - match a.kind { - ConnCommandKind::ChannelAdd(name, cssid) => { - self.cmd_channel_add(name, cssid); - Ready(Some(Ok(()))) - } - ConnCommandKind::ChannelRemove(name) => { - self.cmd_channel_remove(name); - Ready(Some(Ok(()))) - } - ConnCommandKind::CheckHealth => { - self.cmd_check_health(); - Ready(Some(Ok(()))) - } - ConnCommandKind::Shutdown => { - self.cmd_shutdown(); - Ready(Some(Ok(()))) - } - ConnCommandKind::SeriesLookupResult(x) => match self.handle_series_lookup_result(x) { - Ok(()) => Ready(Some(Ok(()))), - Err(e) => Ready(Some(Err(e))), - }, + self.stats.caconn_loop3_count.inc(); + if self.is_shutdown() { + Ok(Ready(None)) + } else { + match self.conn_command_rx.poll_next_unpin(cx) { + Ready(Some(a)) => { + trace3!("handle_conn_command received a command {}", self.remote_addr_dbg); + match a.kind { + ConnCommandKind::ChannelAdd(name, cssid) => { + self.cmd_channel_add(name, cssid); + Ok(Ready(Some(()))) } + ConnCommandKind::ChannelRemove(name) => { + self.cmd_channel_remove(name); + Ok(Ready(Some(()))) + } + ConnCommandKind::CheckHealth => { + self.cmd_check_health(); + Ok(Ready(Some(()))) + } + ConnCommandKind::Shutdown => { + self.cmd_shutdown(); + Ok(Ready(Some(()))) + } + ConnCommandKind::SeriesLookupResult(x) => match self.handle_series_lookup_result(x) { + Ok(()) => Ok(Ready(Some(()))), + Err(e) => Err(e), + }, } - Ready(None) => { - error!("Command queue closed"); - Ready(None) - } - Pending => Pending, } - }; + Ready(None) => { + error!("Command queue closed"); + Ok(Ready(None)) + } + Pending => Ok(Pending), + } } } @@ -1708,30 +1716,38 @@ impl CaConn { fn loop_inner(&mut self, cx: &mut Context) -> Result>, Error> { use Poll::*; - loop { + let mut have_progress = false; + for _ in 0..64 { self.stats.caconn_loop2_count.inc(); - break if self.is_shutdown() { - Ok(Ready(None)) + if self.is_shutdown() { + break; } else if self.insert_item_queue.len() >= self.opts.insert_queue_max { - warn!("======================================================= queue stall"); - Ok(Ready(None)) + break; } else { match self.handle_conn_state(cx) { Ok(x) => match x { - Ready(Some(())) => continue, + Ready(Some(())) => { + have_progress = true; + continue; + } Ready(None) => { error!("handle_conn_state yields {x:?}"); - Err(Error::with_msg_no_trace("logic error")) + return Err(Error::with_msg_no_trace("logic error")); } - Pending => Ok(Pending), + Pending => return Ok(Pending), }, - Err(e) => Err(e), + Err(e) => return Err(e), } }; } + if have_progress { + Ok(Ready(Some(()))) + } else { + Ok(Ready(None)) + } } - fn handle_own_ticker(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> { + fn handle_own_ticker(mut self: Pin<&mut Self>, cx: &mut Context) -> Result, Error> { use Poll::*; match self.ticker.poll_unpin(cx) { Ready(()) => { @@ -1742,7 +1758,7 @@ impl CaConn { let _ = self.ticker.poll_unpin(cx); // cx.waker().wake_by_ref(); } - Ok(()) + Ok(Pending) } Err(e) => { error!("handle_own_ticker {e}"); @@ -1751,7 +1767,7 @@ impl CaConn { } } } - Pending => Ok(()), + Pending => Ok(Pending), } } @@ -1773,28 +1789,56 @@ impl CaConn { true } - fn attempt_flush_channel_info_query(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> { + fn attempt_flush_storage_queue(mut self: Pin<&mut Self>, cx: &mut Context) -> Result>, Error> { use Poll::*; - loop { - break if self.is_shutdown() { - Ok(()) - } else { - let sd = &mut self.channel_info_query_sending; - if sd.is_sending() { - match sd.poll_unpin(cx) { - Ready(Ok(())) => continue, - Ready(Err(_)) => Err(Error::with_msg_no_trace("can not send into channel")), - Pending => Ok(()), + let mut have_progress = false; + for _ in 0..128 { + let sd = &mut self.storage_insert_sender; + if sd.is_idle() { + if let Some(item) = self.insert_item_queue.pop_front() { + self.storage_insert_sender.send(item); + } + } + if self.storage_insert_sender.is_sending() { + match self.storage_insert_sender.poll_unpin(cx) { + Ready(Ok(())) => { + have_progress = true; } - } else if let Some(item) = self.channel_info_query_queue.pop_front() { + Ready(Err(_)) => return Err(Error::with_msg_no_trace("can not send into channel")), + Pending => return Ok(Pending), + } + } + } + if have_progress { + Ok(Ready(Some(()))) + } else { + Ok(Ready(None)) + } + } + + fn attempt_flush_channel_info_query(mut self: Pin<&mut Self>, cx: &mut Context) -> Result>, Error> { + use Poll::*; + if self.is_shutdown() { + Ok(Ready(None)) + } else { + let sd = &mut self.channel_info_query_sending; + if sd.is_idle() { + if let Some(item) = self.channel_info_query_queue.pop_front() { trace3!("send series query {item:?}"); let sd = &mut self.channel_info_query_sending; sd.send(item); - continue; - } else { - Ok(()) } - }; + } + let sd = &mut self.channel_info_query_sending; + if sd.is_sending() { + match sd.poll_unpin(cx) { + Ready(Ok(())) => Ok(Ready(Some(()))), + Ready(Err(_)) => Err(Error::with_msg_no_trace("can not send into channel")), + Pending => Ok(Pending), + } + } else { + Ok(Ready(None)) + } } } } @@ -1806,63 +1850,130 @@ impl Stream for CaConn { use Poll::*; self.stats.caconn_poll_count.inc(); let poll_ts1 = Instant::now(); + self.stats.ca_conn_poll_fn_begin().inc(); let ret = loop { + self.stats.ca_conn_poll_loop_begin().inc(); let qlen = self.insert_item_queue.len(); - if qlen > self.opts.insert_queue_max / 3 { + if qlen >= self.opts.insert_queue_max * 2 / 3 { self.stats.insert_item_queue_pressure().inc(); + } else if qlen >= self.opts.insert_queue_max { + self.stats.insert_item_queue_full().inc(); } - break if let CaConnState::EndOfStream = self.state { - Ready(None) - } else if let Err(e) = self.as_mut().handle_own_ticker(cx) { - Ready(Some(Err(e))) - } else if let Some(item) = self.cmd_res_queue.pop_front() { + + let mut have_pending = false; + let mut have_progress = false; + + if let CaConnState::EndOfStream = self.state { + break Ready(None); + } + if let Some(item) = self.cmd_res_queue.pop_front() { let item = CaConnEvent { ts: Instant::now(), value: CaConnEventValue::ConnCommandResult(item), }; - Ready(Some(Ok(item))) - } else if let Some(item) = self.ca_conn_event_out_queue.pop_front() { - Ready(Some(Ok(item))) - } else if let Some(item) = self.insert_item_queue.pop_front() { - let ev = CaConnEvent { - ts: Instant::now(), - value: CaConnEventValue::QueryItem(item), - }; - Ready(Some(Ok(ev))) - } else if let Err(e) = self.as_mut().attempt_flush_channel_info_query(cx) { - Ready(Some(Err(e))) - } else if let Ready(Some(Err(e))) = self.as_mut().handle_conn_command(cx) { - Ready(Some(Err(e))) - } else { - match self.loop_inner(cx) { - Ok(Ready(Some(()))) => continue, - Ok(Ready(None)) => { - // Ready(_) => self.stats.conn_stream_ready.inc(), - // Pending => self.stats.conn_stream_pending.inc(), - let _item = CaConnEvent { - ts: Instant::now(), - value: CaConnEventValue::None, - }; - if self.is_shutdown() { - if self.queues_async_out_flushed() == false { - debug!("shutdown, but async queues not flushed"); - continue; - } else { - debug!("end of stream {}", self.remote_addr_dbg); - self.state = CaConnState::EndOfStream; - Ready(None) - } - } else { - continue; - } - } - Ok(Pending) => Pending, - Err(e) => { - error!("{e}"); - self.state = CaConnState::EndOfStream; + break Ready(Some(Ok(item))); + } + if let Some(item) = self.ca_conn_event_out_queue.pop_front() { + break Ready(Some(Ok(item))); + } + // if let Some(item) = self.insert_item_queue.pop_front() { + // let ev = CaConnEvent { + // ts: Instant::now(), + // value: CaConnEventValue::QueryItem(item), + // }; + // break Ready(Some(Ok(ev))); + // } + + match self.as_mut().handle_own_ticker(cx) { + Ok(Ready(())) => { + have_progress = true; + } + Ok(Pending) => { + have_pending = true; + } + Err(e) => break Ready(Some(Err(e))), + } + + match self.as_mut().attempt_flush_storage_queue(cx) { + Ok(Ready(Some(()))) => { + have_progress = true; + } + Ok(Ready(None)) => {} + Ok(Pending) => { + have_pending = true; + } + Err(e) => break Ready(Some(Err(e))), + } + + match self.as_mut().attempt_flush_channel_info_query(cx) { + Ok(Ready(Some(()))) => { + have_progress = true; + } + Ok(Ready(None)) => {} + Ok(Pending) => { + have_pending = true; + } + Err(e) => break Ready(Some(Err(e))), + } + + match self.as_mut().handle_conn_command(cx) { + Ok(Ready(Some(()))) => { + have_progress = true; + } + Ok(Ready(None)) => {} + Ok(Pending) => { + have_pending = true; + } + Err(e) => break Ready(Some(Err(e))), + } + + match self.loop_inner(cx) { + Ok(Ready(Some(()))) => { + have_progress = true; + } + Ok(Ready(None)) => {} + Ok(Pending) => { + have_pending = true; + } + Err(e) => { + error!("{e}"); + self.state = CaConnState::EndOfStream; + break Ready(Some(Err(e))); + } + } + + break if self.is_shutdown() { + if self.queues_async_out_flushed() { + debug!("end of stream {}", self.remote_addr_dbg); + self.state = CaConnState::EndOfStream; + Ready(None) + } else { + if have_progress { + self.stats.ca_conn_poll_reloop().inc(); + continue; + } else if have_pending { + self.stats.ca_conn_poll_pending().inc(); + Pending + } else { + // TODO error + error!("logic error"); + self.stats.logic_error().inc(); + let e = Error::with_msg_no_trace("shutdown, not done, no progress, no pending"); Ready(Some(Err(e))) } } + } else { + if have_progress { + self.stats.ca_conn_poll_reloop().inc(); + continue; + } else if have_pending { + self.stats.ca_conn_poll_pending().inc(); + Pending + } else { + self.stats.ca_conn_poll_no_progress_no_pending().inc(); + let e = Error::with_msg_no_trace("no progress no pending"); + Ready(Some(Err(e))) + } }; }; let poll_ts2 = Instant::now(); diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index c09288d..9973191 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -15,7 +15,6 @@ use crate::ca::statemap::WithAddressState; use crate::daemon_common::Channel; use crate::errconv::ErrConv; use crate::rt::JoinHandle; -use crate::rt::TokMx; use crate::senderpolling::SenderPolling; use crate::throttletrace::ThrottleTrace; use async_channel::Receiver; @@ -152,16 +151,34 @@ pub struct ChannelRemove { name: String, } +pub struct ChannelStatusRequest { + pub tx: Sender, +} + #[derive(Debug, Clone, Serialize)] -pub struct ChannelStatusesResponse { +pub struct ChannelStatusResponse { pub channels_ca_conn: BTreeMap, pub channels_ca_conn_set: BTreeMap, } +impl fmt::Debug for ChannelStatusRequest { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("ChannelStatusesRequest").finish() + } +} + pub struct ChannelStatusesRequest { + pub name: String, + pub limit: u64, pub tx: Sender, } +#[derive(Debug, Clone, Serialize)] +pub struct ChannelStatusesResponse { + pub channels_ca_conn: BTreeMap, + pub channels_ca_conn_set: BTreeMap, +} + impl fmt::Debug for ChannelStatusesRequest { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("ChannelStatusesRequest").finish() @@ -418,7 +435,7 @@ impl CaConnSet { fn handle_ca_conn_event(&mut self, addr: SocketAddr, ev: CaConnEvent) -> Result<(), Error> { match ev.value { CaConnEventValue::None => Ok(()), - CaConnEventValue::EchoTimeout => todo!(), + CaConnEventValue::EchoTimeout => Ok(()), CaConnEventValue::ConnCommandResult(x) => self.handle_conn_command_result(addr, x), CaConnEventValue::QueryItem(item) => { self.storage_insert_queue.push_back(item); @@ -645,14 +662,23 @@ impl CaConnSet { return Ok(()); } debug!("handle_channel_statuses_req"); + let reg1 = regex::Regex::new(&req.name)?; + let channels_ca_conn = self + .ca_conn_channel_states + .iter() + .filter(|x| reg1.is_match(x.0)) + .map(|(k, v)| (k.to_string(), v.clone())) + .collect(); + let channels_ca_conn_set = self + .channel_states + .inner() + .iter() + .filter(|(k, v)| reg1.is_match(k.id())) + .map(|(k, v)| (k.id().to_string(), v.clone())) + .collect(); let item = ChannelStatusesResponse { - channels_ca_conn: self.ca_conn_channel_states.clone(), - channels_ca_conn_set: self - .channel_states - .inner() - .iter() - .map(|(k, v)| (k.id().to_string(), v.clone())) - .collect(), + channels_ca_conn, + channels_ca_conn_set, }; if req.tx.try_send(item).is_err() { self.stats.response_tx_fail.inc(); @@ -737,6 +763,7 @@ impl CaConnSet { add.backend.clone(), addr_v4, add.local_epics_hostname, + self.storage_insert_tx.clone(), self.channel_info_query_tx.clone(), self.ca_conn_stats.clone(), ); @@ -897,19 +924,6 @@ impl CaConnSet { Ok(()) } - async fn conn_remove( - ca_conn_ress: &TokMx>, - addr: SocketAddrV4, - ) -> Result { - // TODO make this lock-free. - //warn!("Lock for conn_remove"); - if let Some(_caconn) = ca_conn_ress.lock().await.remove(&addr) { - Ok(true) - } else { - Ok(false) - } - } - fn check_connection_states(&mut self) -> Result<(), Error> { let tsnow = Instant::now(); for (addr, val) in &mut self.ca_conn_ress { diff --git a/netfetch/src/ca/monitor.rs b/netfetch/src/ca/monitor.rs deleted file mode 100644 index e69de29..0000000 diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 9ae82d5..98315ce 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -3,7 +3,6 @@ use crate::ca::connset::CaConnSetEvent; use crate::ca::connset::ChannelStatusesRequest; use crate::ca::connset::ChannelStatusesResponse; use crate::ca::connset::ConnSetCmd; -use crate::ca::METRICS; use crate::daemon_common::DaemonEvent; use async_channel::Receiver; use async_channel::Sender; @@ -124,34 +123,39 @@ async fn channel_remove(params: HashMap, dcom: Arc) Json(Value::Bool(false)) } -async fn channel_state(params: HashMap, dcom: Arc) -> axum::Json { - let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string(); - error!("TODO channel_state"); - axum::Json(false) +async fn channel_state( + params: HashMap, + tx: Sender, +) -> axum::Json { + panic!("TODO"); } // axum::Json -async fn channel_states(params: HashMap, tx: Sender) -> String { +async fn channel_states( + params: HashMap, + tx: Sender, +) -> axum::Json { + let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string(); let limit = params .get("limit") .map(|x| x.parse().ok()) .unwrap_or(None) .unwrap_or(40); let (tx2, rx2) = async_channel::bounded(1); - let req = ChannelStatusesRequest { tx: tx2 }; + let req = ChannelStatusesRequest { name, limit, tx: tx2 }; let item = CaConnSetEvent::ConnSetCmd(ConnSetCmd::ChannelStatuses(req)); // TODO handle error tx.send(item).await; let res = rx2.recv().await.unwrap(); - match serde_json::to_string(&res) { - Ok(x) => x, - Err(e) => { - error!("Serialize error {e}"); - Err::<(), _>(e).unwrap(); - panic!(); - } - } - // axum::Json(res) + // match serde_json::to_string(&res) { + // Ok(x) => x, + // Err(e) => { + // error!("Serialize error {e}"); + // Err::<(), _>(e).unwrap(); + // panic!(); + // } + // } + axum::Json(res) } async fn extra_inserts_conf_set(v: ExtraInsertsConf, dcom: Arc) -> axum::Json { @@ -229,14 +233,15 @@ fn make_routes(dcom: Arc, connset_cmd_tx: Sender, st .route( "/daqingest/channel/state", get({ - let dcom = dcom.clone(); - |Query(params): Query>| channel_state(params, dcom) + // let dcom = dcom.clone(); + let tx = connset_cmd_tx.clone(); + |Query(params): Query>| channel_state(params, tx) }), ) .route( "/daqingest/channel/states", get({ - let dcom = dcom.clone(); + // let dcom = dcom.clone(); let tx = connset_cmd_tx.clone(); |Query(params): Query>| channel_states(params, tx) }), @@ -349,11 +354,14 @@ pub async fn metrics_agg_task( let nitems = query_item_chn.upgrade().map_or(0, |x| x.len()); agg.store_worker_recv_queue_len.__set(nitems as u64); } - let mut m = METRICS.lock().unwrap(); - *m = Some(agg.clone()); - if false { - let diff = CaConnStatsAggDiff::diff_from(&agg_last, &agg); - info!("{}", diff.display()); + #[cfg(DISABLED)] + { + let mut m = METRICS.lock().unwrap(); + *m = Some(agg.clone()); + if false { + let diff = CaConnStatsAggDiff::diff_from(&agg_last, &agg); + info!("{}", diff.display()); + } } agg_last = agg; } diff --git a/netfetch/src/rt.rs b/netfetch/src/rt.rs index 6fa8a57..d6cdd89 100644 --- a/netfetch/src/rt.rs +++ b/netfetch/src/rt.rs @@ -1,4 +1,3 @@ use taskrun::tokio; -pub use tokio::sync::Mutex as TokMx; pub use tokio::task::JoinHandle; pub use tokio::time::sleep; diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 98eba81..08581b2 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -189,46 +189,64 @@ async fn worker( } } QueryItem::Insert(item) => { + let item_ts_local = item.ts_local.clone(); let tsnow = { let ts = SystemTime::now(); let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); epoch.as_secs() * SEC + epoch.subsec_nanos() as u64 }; - let dt = (tsnow / 1000) as i64 - (item.ts_local / 1000) as i64; + let dt = (tsnow / 1000000) as i64 - (item_ts_local / 1000000) as i64; if dt < 0 { stats.item_latency_neg().inc(); - } else if dt <= 1000 * 25 { + } else if dt <= 25 { stats.item_latency_025ms().inc(); - } else if dt <= 1000 * 50 { + } else if dt <= 50 { stats.item_latency_050ms().inc(); - } else if dt <= 1000 * 100 { + } else if dt <= 100 { stats.item_latency_100ms().inc(); - } else if dt <= 1000 * 200 { + } else if dt <= 200 { stats.item_latency_200ms().inc(); - } else if dt <= 1000 * 400 { + } else if dt <= 400 { stats.item_latency_400ms().inc(); - } else if dt <= 1000 * 800 { + } else if dt <= 800 { stats.item_latency_800ms().inc(); + } else if dt <= 1600 { + stats.item_latency_1600ms().inc(); + } else if dt <= 3200 { + stats.item_latency_3200ms().inc(); } else { stats.item_latency_large().inc(); } - if false { - stats.inserted_values().inc(); - } else { - let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire); - let do_insert = i1 % 1000 < insert_frac; - match insert_item(item, ttls.index, ttls.d0, ttls.d1, &data_store, &stats, do_insert).await { - Ok(_) => { - stats.inserted_values().inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; + let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire); + let do_insert = i1 % 1000 < insert_frac; + match insert_item(item, ttls.index, ttls.d0, ttls.d1, &data_store, &stats, do_insert).await { + Ok(_) => { + stats.inserted_values().inc(); + let tsnow = { + let ts = SystemTime::now(); + let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); + epoch.as_secs() * SEC + epoch.subsec_nanos() as u64 + }; + let dt = (tsnow / 1000000) as i64 - (item_ts_local / 1000000) as i64; + if dt <= 50 { + stats.item_commit_latency_0050ms().inc(); + } else if dt <= 200 { + stats.item_commit_latency_0200ms().inc(); + } else if dt <= 800 { + stats.item_commit_latency_0800ms().inc(); + } else if dt <= 3200 { + stats.item_commit_latency_3200ms().inc(); + } else { + stats.item_commit_latency_large().inc(); } + backoff = backoff_0; + } + Err(e) => { + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; } - i1 += 1; } + i1 += 1; } QueryItem::Mute(item) => { let values = ( diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 2232fba..e995bbb 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -1,25 +1,17 @@ pub use netpod::CONNECTION_STATUS_DIV; use crate::store::DataStore; -use async_channel::Receiver; -use async_channel::Sender; use err::thiserror; use err::ThisError; -use log::*; use netpod::ScalarType; use netpod::Shape; use scylla::prepared_statement::PreparedStatement; use scylla::transport::errors::DbError; use scylla::transport::errors::QueryError; use series::SeriesId; -use stats::CaConnStats; use stats::InsertWorkerStats; use std::net::SocketAddrV4; -use std::sync::atomic; -use std::sync::atomic::AtomicU64; -use std::sync::Mutex; use std::time::Duration; -use std::time::Instant; use std::time::SystemTime; #[derive(Debug, ThisError)] @@ -244,83 +236,6 @@ pub enum QueryItem { TimeBinPatchSimpleF32(TimeBinPatchSimpleF32), } -pub struct CommonInsertItemQueueSender { - sender: Sender, -} - -impl CommonInsertItemQueueSender { - #[inline(always)] - pub fn send(&self, k: QueryItem) -> async_channel::Send { - self.sender.send(k) - } - - #[inline(always)] - pub fn is_full(&self) -> bool { - self.sender.is_full() - } - - pub fn inner(&self) -> &Sender { - &self.sender - } -} - -pub struct CommonInsertItemQueue { - sender: Mutex>>, - recv: Receiver, -} - -impl CommonInsertItemQueue { - pub fn new(cap: usize) -> Self { - let (tx, rx) = async_channel::bounded(cap); - Self { - sender: Mutex::new(Some(tx)), - recv: rx, - } - } - - pub fn from_tx_rx(tx: Sender, rx: Receiver) -> Self { - Self { - sender: Mutex::new(Some(tx)), - recv: rx, - } - } - - pub fn sender(&self) -> Option { - match self.sender.lock().unwrap().as_ref() { - Some(sender) => { - let ret = CommonInsertItemQueueSender { sender: sender.clone() }; - Some(ret) - } - None => None, - } - } - - pub fn receiver(&self) -> Option> { - let ret = self.recv.clone(); - Some(ret) - } - - pub fn sender_count(&self) -> Option { - self.sender.lock().unwrap().as_ref().map(|x| x.sender_count()) - } - - pub fn sender_count_2(&self) -> usize { - self.recv.sender_count() - } - - pub fn receiver_count(&self) -> usize { - self.recv.receiver_count() - } - - pub fn close(&self) { - self.sender.lock().unwrap().as_ref().map(|x| x.close()); - } - - pub fn drop_sender(&self) { - self.sender.lock().unwrap().take(); - } -} - struct InsParCom { series: u64, ts_msp: u64, @@ -384,15 +299,24 @@ where val, par.ttl as i32, ); - data_store.scy.execute(qu, params).await?; - Ok(()) + let y = data_store.scy.execute(qu, params).await; + match y { + Ok(_) => Ok(()), + Err(e) => match e { + QueryError::TimeoutError => Err(Error::DbTimeout), + // TODO use `msg` + QueryError::DbError(e, _msg) => match e { + DbError::Overloaded => Err(Error::DbOverload), + _ => Err(e.into()), + }, + _ => Err(e.into()), + }, + } } else { Ok(()) } } -static warn_last: AtomicU64 = AtomicU64::new(0); - pub async fn insert_item( item: InsertItem, ttl_index: Duration, @@ -441,24 +365,8 @@ pub async fn insert_item( I32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i32, &data_store).await?, F32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f32, &data_store).await?, F64(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f64, &data_store).await?, - String(val) => { - let ts = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .map_or(0, |x| x.as_secs()); - if ts > warn_last.load(atomic::Ordering::Acquire) + 10 { - warn_last.store(ts, atomic::Ordering::Release); - warn!("TODO string insert {val}"); - } - } - Bool(val) => { - let ts = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .map_or(0, |x| x.as_secs()); - if ts > warn_last.load(atomic::Ordering::Acquire) + 10 { - warn_last.store(ts, atomic::Ordering::Release); - warn!("TODO bool insert {val}"); - } - } + String(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_string, &data_store).await?, + Bool(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_bool, &data_store).await?, } } Array(val) => { diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 564849d..5fae62f 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -207,13 +207,40 @@ impl GenTwcsTab { } } +fn table_param_compaction(compaction_window_size: Duration) -> String { + table_param_compaction_twcs(compaction_window_size) +} + +#[allow(unused)] +fn table_param_compaction_stcs() -> String { + format!(concat!( + "{{ 'class': 'SizeTieredCompactionStrategy'", + // ", 'min_sstable_size': 200", + // ", 'max_threshold': 10", + " }}" + )) +} + +#[allow(unused)] +fn table_param_compaction_twcs(compaction_window_size: Duration) -> String { + format!( + concat!( + "{{ 'class': 'TimeWindowCompactionStrategy'", + ", 'compaction_window_unit': 'HOURS'", + ", 'compaction_window_size': {}", + " }}" + ), + compaction_window_size.as_secs() / 60 / 60 + ) +} + struct EvTabDim0 { sty: String, cqlsty: String, // SCYLLA_TTL_EVENTS_DIM0 - default_time_to_live: usize, + default_time_to_live: Duration, // TWCS_WINDOW_0D - compaction_window_size: usize, + compaction_window_size: Duration, } impl EvTabDim0 { @@ -223,14 +250,13 @@ impl EvTabDim0 { fn cql_create(&self) -> String { use std::fmt::Write; + let ttl = self.default_time_to_live.as_secs(); + let compaction = table_param_compaction(self.compaction_window_size); let mut s = String::new(); write!(s, "create table {}", self.name()).unwrap(); write!(s, " (series bigint, ts_msp bigint, ts_lsp bigint, pulse bigint, value {}, primary key ((series, ts_msp), ts_lsp))", self.cqlsty).unwrap(); - write!(s, " with default_time_to_live = {}", self.default_time_to_live).unwrap(); - s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'HOURS'") - .unwrap(); - write!(s, ", 'compaction_window_size': {}", self.compaction_window_size).unwrap(); - s.write_str(" }").unwrap(); + write!(s, " with default_time_to_live = {}", ttl).unwrap(); + write!(s, " and compaction = {}", compaction).unwrap(); s } } @@ -239,9 +265,9 @@ struct EvTabDim1 { sty: String, cqlsty: String, // SCYLLA_TTL_EVENTS_DIM1 - default_time_to_live: usize, + default_time_to_live: Duration, // TWCS_WINDOW_1D - compaction_window_size: usize, + compaction_window_size: Duration, } impl EvTabDim1 { @@ -252,13 +278,12 @@ impl EvTabDim1 { fn cql(&self) -> String { use std::fmt::Write; let mut s = String::new(); + let ttl = self.default_time_to_live.as_secs(); + let compaction = table_param_compaction(self.compaction_window_size); write!(s, "create table {}", self.name()).unwrap(); write!(s, " (series bigint, ts_msp bigint, ts_lsp bigint, pulse bigint, value {}, primary key ((series, ts_msp), ts_lsp))", self.cqlsty).unwrap(); - write!(s, " with default_time_to_live = {}", self.default_time_to_live).unwrap(); - s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'HOURS'") - .unwrap(); - write!(s, ", 'compaction_window_size': {}", self.compaction_window_size).unwrap(); - s.write_str(" }").unwrap(); + write!(s, " with default_time_to_live = {}", ttl).unwrap(); + write!(s, " and compaction = {}", compaction).unwrap(); s } } @@ -294,8 +319,8 @@ async fn check_event_tables(scy: &ScySession) -> Result<(), Error> { sty: sty.into(), cqlsty: cqlsty.into(), // ttl is set in actual data inserts - default_time_to_live: 60 * 60 * 1, - compaction_window_size: 48, + default_time_to_live: dhours(1), + compaction_window_size: dhours(48), }; if !has_table(&desc.name(), scy).await? { scy.query(desc.cql_create(), ()).await?; @@ -304,8 +329,8 @@ async fn check_event_tables(scy: &ScySession) -> Result<(), Error> { sty: sty.into(), cqlsty: format!("frozen>", cqlsty), // ttl is set in actual data inserts - default_time_to_live: 60 * 60 * 1, - compaction_window_size: 12, + default_time_to_live: dhours(1), + compaction_window_size: dhours(12), }; if !check_table_readable(&desc.name(), scy).await? { scy.query(desc.cql(), ()).await?; @@ -319,8 +344,16 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaConfig) -> Result<(), Er let scy = &scy2; if !has_keyspace(&scyconf.keyspace, scy).await? { - let rf = 2; - let cql = format!("create keyspace {} with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }} and durable_writes = true;", scyconf.keyspace, rf); + let replication = 2; + let durable = false; + let cql = format!( + concat!( + "create keyspace {}", + " with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }}", + " and durable_writes = {};" + ), + scyconf.keyspace, replication, durable + ); scy.query_iter(cql, ()).await?; info!("keyspace created"); } diff --git a/scywr/src/store.rs b/scywr/src/store.rs index 5250856..d34c220 100644 --- a/scywr/src/store.rs +++ b/scywr/src/store.rs @@ -22,12 +22,15 @@ pub struct DataStore { pub qu_insert_scalar_i8: Arc, pub qu_insert_scalar_i16: Arc, pub qu_insert_scalar_i32: Arc, + pub qu_insert_scalar_i64: Arc, pub qu_insert_scalar_f32: Arc, pub qu_insert_scalar_f64: Arc, + pub qu_insert_scalar_bool: Arc, pub qu_insert_scalar_string: Arc, pub qu_insert_array_i8: Arc, pub qu_insert_array_i16: Arc, pub qu_insert_array_i32: Arc, + pub qu_insert_array_i64: Arc, pub qu_insert_array_f32: Arc, pub qu_insert_array_f64: Arc, pub qu_insert_array_bool: Arc, @@ -80,6 +83,11 @@ impl DataStore { let q = scy.prepare(cql).await?; let qu_insert_scalar_i32 = Arc::new(q); + let cql = + "insert into events_scalar_i64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let q = scy.prepare(cql).await?; + let qu_insert_scalar_i64 = Arc::new(q); + let cql = "insert into events_scalar_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; let q = scy.prepare(cql).await?; @@ -90,6 +98,11 @@ impl DataStore { let q = scy.prepare(cql).await?; let qu_insert_scalar_f64 = Arc::new(q); + let cql = + "insert into events_scalar_bool (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let q = scy.prepare(cql).await?; + let qu_insert_scalar_bool = Arc::new(q); + let cql="insert into events_scalar_string (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; let q = scy.prepare(cql).await?; let qu_insert_scalar_string = Arc::new(q); @@ -110,6 +123,11 @@ impl DataStore { let q = scy.prepare(cql).await?; let qu_insert_array_i32 = Arc::new(q); + let cql = + "insert into events_array_i64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let q = scy.prepare(cql).await?; + let qu_insert_array_i64 = Arc::new(q); + let cql = "insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; let q = scy.prepare(cql).await?; @@ -169,12 +187,15 @@ impl DataStore { qu_insert_scalar_i8, qu_insert_scalar_i16, qu_insert_scalar_i32, + qu_insert_scalar_i64, qu_insert_scalar_f32, qu_insert_scalar_f64, + qu_insert_scalar_bool, qu_insert_scalar_string, qu_insert_array_i8, qu_insert_array_i16, qu_insert_array_i32, + qu_insert_array_i64, qu_insert_array_f32, qu_insert_array_f64, qu_insert_array_bool, diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 7b9dd6b..157b54c 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -283,7 +283,14 @@ stats_proc::stats_struct!(( item_latency_200ms, item_latency_400ms, item_latency_800ms, + item_latency_1600ms, + item_latency_3200ms, item_latency_large, + item_commit_latency_0050ms, + item_commit_latency_0200ms, + item_commit_latency_0800ms, + item_commit_latency_3200ms, + item_commit_latency_large, worker_start, worker_finish, ) @@ -303,7 +310,9 @@ stats_proc::stats_struct!(( inserts_queue_push, inserts_queue_drop, insert_item_queue_pressure, + insert_item_queue_full, channel_fast_item_drop, + logic_error, store_worker_recv_queue_len, // TODO maybe rename: this is now only the recv of the intermediate queue: store_worker_item_recv, @@ -355,6 +364,11 @@ stats_proc::stats_struct!(( pong_recv_400ms, pong_recv_slow, pong_timeout, + ca_conn_poll_fn_begin, + ca_conn_poll_loop_begin, + ca_conn_poll_reloop, + ca_conn_poll_pending, + ca_conn_poll_no_progress_no_pending, ), values(inter_ivl_ema) ),