Remove usage of lock

This commit is contained in:
Dominik Werder
2023-09-15 16:49:32 +02:00
parent b21dfae560
commit b175516b62
13 changed files with 447 additions and 336 deletions
+14 -11
View File
@@ -2,6 +2,7 @@ use crate::daemon::PRINT_ACTIVE_INTERVAL;
use async_channel::Receiver;
use async_channel::Sender;
use log::*;
use netpod::ScalarType;
use netpod::Shape;
use scywr::iteminsertqueue::QueryItem;
use std::collections::BTreeMap;
@@ -30,16 +31,18 @@ pub async fn active_channel_insert_hook_worker(rx: Receiver<QueryItem>, tx: Send
Shape::Wave(_) => 1,
Shape::Image(_, _) => 2,
};
histo
.entry(item.series.clone())
.and_modify(|(c, msp, lsp, pulse, _shape_kind)| {
*c += 1;
*msp = item.ts_msp;
*lsp = item.ts_lsp;
*pulse = item.pulse;
// TODO should check that shape_kind stays the same.
})
.or_insert((0 as usize, item.ts_msp, item.ts_lsp, item.pulse, shape_kind));
if let ScalarType::STRING = item.scalar_type {
histo
.entry(item.series.clone())
.and_modify(|(c, msp, lsp, pulse, _shape_kind)| {
*c += 1;
*msp = item.ts_msp;
*lsp = item.ts_lsp;
*pulse = item.pulse;
// TODO should check that shape_kind stays the same.
})
.or_insert((0 as usize, item.ts_msp, item.ts_lsp, item.pulse, shape_kind));
}
}
_ => {}
}
@@ -75,7 +78,7 @@ pub async fn active_channel_insert_hook_worker(rx: Receiver<QueryItem>, tx: Send
}
pub fn active_channel_insert_hook(inp: Receiver<QueryItem>) -> Receiver<QueryItem> {
let (tx, rx) = async_channel::bounded(256);
let (tx, rx) = async_channel::bounded(inp.capacity().unwrap_or(256));
tokio::spawn(active_channel_insert_hook_worker(inp, tx));
rx
}
+1 -1
View File
@@ -259,7 +259,7 @@ impl Worker {
let mut all_good = true;
for h in &mut hashers {
let mut good = false;
for _ in 0..50 {
for _ in 0..400 {
h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes());
let f = h.clone().finalize();
let series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap());
-18
View File
@@ -8,20 +8,11 @@ pub mod search;
pub mod statemap;
use crate::metrics::ExtraInsertsConf;
use crate::rt::TokMx;
use futures_util::Future;
use futures_util::FutureExt;
use log::*;
use netpod::Database;
use scywr::insertworker::InsertWorkerOpts;
use scywr::store::DataStore;
use stats::CaConnStatsAgg;
use std::pin::Pin;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
@@ -29,10 +20,6 @@ use taskrun::tokio;
pub static SIGINT: AtomicU32 = AtomicU32::new(0);
lazy_static::lazy_static! {
pub static ref METRICS: Mutex<Option<CaConnStatsAgg>> = Mutex::new(None);
}
pub trait SlowWarnable {
fn slow_warn(self, ms: u64) -> SlowWarn<Pin<Box<Self>>>
where
@@ -118,8 +105,3 @@ where
}
}
}
fn handler_sigaction(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) {
crate::ca::SIGINT.store(1, Ordering::Release);
let _ = ingest_linux::signal::unset_signal_handler(libc::SIGINT);
}
+221 -110
View File
@@ -453,6 +453,13 @@ pub struct CaConnOpts {
array_truncate: usize,
}
impl CaConnOpts {
pub fn with_insert_queue_max(mut self, val: usize) -> Self {
self.insert_queue_max = val;
self
}
}
impl Default for CaConnOpts {
fn default() -> Self {
Self {
@@ -488,6 +495,7 @@ pub struct CaConn {
extra_inserts_conf: ExtraInsertsConf,
ioc_ping_last: Instant,
ioc_ping_start: Option<Instant>,
storage_insert_sender: SenderPolling<QueryItem>,
cmd_res_queue: VecDeque<ConnCommandResult>,
ca_conn_event_out_queue: VecDeque<CaConnEvent>,
channel_info_query_queue: VecDeque<ChannelInfoQuery>,
@@ -508,6 +516,7 @@ impl CaConn {
backend: String,
remote_addr_dbg: SocketAddrV4,
local_epics_hostname: String,
storage_insert_tx: Sender<QueryItem>,
channel_info_query_tx: Sender<ChannelInfoQuery>,
stats: Arc<CaConnStats>,
) -> Self {
@@ -538,6 +547,7 @@ impl CaConn {
extra_inserts_conf: ExtraInsertsConf::new(),
ioc_ping_last: Instant::now(),
ioc_ping_start: None,
storage_insert_sender: SenderPolling::new(storage_insert_tx),
cmd_res_queue: VecDeque::new(),
ca_conn_event_out_queue: VecDeque::new(),
channel_info_query_queue: VecDeque::new(),
@@ -710,47 +720,45 @@ impl CaConn {
Ok(())
}
fn handle_conn_command(&mut self, cx: &mut Context) -> Poll<Option<Result<(), Error>>> {
fn handle_conn_command(&mut self, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
// TODO if this loops for too long time, yield and make sure we get wake up again.
use Poll::*;
loop {
self.stats.caconn_loop3_count.inc();
break if self.is_shutdown() {
Ready(None)
} else {
match self.conn_command_rx.poll_next_unpin(cx) {
Ready(Some(a)) => {
trace3!("handle_conn_command received a command {}", self.remote_addr_dbg);
match a.kind {
ConnCommandKind::ChannelAdd(name, cssid) => {
self.cmd_channel_add(name, cssid);
Ready(Some(Ok(())))
}
ConnCommandKind::ChannelRemove(name) => {
self.cmd_channel_remove(name);
Ready(Some(Ok(())))
}
ConnCommandKind::CheckHealth => {
self.cmd_check_health();
Ready(Some(Ok(())))
}
ConnCommandKind::Shutdown => {
self.cmd_shutdown();
Ready(Some(Ok(())))
}
ConnCommandKind::SeriesLookupResult(x) => match self.handle_series_lookup_result(x) {
Ok(()) => Ready(Some(Ok(()))),
Err(e) => Ready(Some(Err(e))),
},
self.stats.caconn_loop3_count.inc();
if self.is_shutdown() {
Ok(Ready(None))
} else {
match self.conn_command_rx.poll_next_unpin(cx) {
Ready(Some(a)) => {
trace3!("handle_conn_command received a command {}", self.remote_addr_dbg);
match a.kind {
ConnCommandKind::ChannelAdd(name, cssid) => {
self.cmd_channel_add(name, cssid);
Ok(Ready(Some(())))
}
ConnCommandKind::ChannelRemove(name) => {
self.cmd_channel_remove(name);
Ok(Ready(Some(())))
}
ConnCommandKind::CheckHealth => {
self.cmd_check_health();
Ok(Ready(Some(())))
}
ConnCommandKind::Shutdown => {
self.cmd_shutdown();
Ok(Ready(Some(())))
}
ConnCommandKind::SeriesLookupResult(x) => match self.handle_series_lookup_result(x) {
Ok(()) => Ok(Ready(Some(()))),
Err(e) => Err(e),
},
}
Ready(None) => {
error!("Command queue closed");
Ready(None)
}
Pending => Pending,
}
};
Ready(None) => {
error!("Command queue closed");
Ok(Ready(None))
}
Pending => Ok(Pending),
}
}
}
@@ -1708,30 +1716,38 @@ impl CaConn {
fn loop_inner(&mut self, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
use Poll::*;
loop {
let mut have_progress = false;
for _ in 0..64 {
self.stats.caconn_loop2_count.inc();
break if self.is_shutdown() {
Ok(Ready(None))
if self.is_shutdown() {
break;
} else if self.insert_item_queue.len() >= self.opts.insert_queue_max {
warn!("======================================================= queue stall");
Ok(Ready(None))
break;
} else {
match self.handle_conn_state(cx) {
Ok(x) => match x {
Ready(Some(())) => continue,
Ready(Some(())) => {
have_progress = true;
continue;
}
Ready(None) => {
error!("handle_conn_state yields {x:?}");
Err(Error::with_msg_no_trace("logic error"))
return Err(Error::with_msg_no_trace("logic error"));
}
Pending => Ok(Pending),
Pending => return Ok(Pending),
},
Err(e) => Err(e),
Err(e) => return Err(e),
}
};
}
if have_progress {
Ok(Ready(Some(())))
} else {
Ok(Ready(None))
}
}
fn handle_own_ticker(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> {
fn handle_own_ticker(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<Poll<()>, Error> {
use Poll::*;
match self.ticker.poll_unpin(cx) {
Ready(()) => {
@@ -1742,7 +1758,7 @@ impl CaConn {
let _ = self.ticker.poll_unpin(cx);
// cx.waker().wake_by_ref();
}
Ok(())
Ok(Pending)
}
Err(e) => {
error!("handle_own_ticker {e}");
@@ -1751,7 +1767,7 @@ impl CaConn {
}
}
}
Pending => Ok(()),
Pending => Ok(Pending),
}
}
@@ -1773,28 +1789,56 @@ impl CaConn {
true
}
fn attempt_flush_channel_info_query(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> {
fn attempt_flush_storage_queue(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
use Poll::*;
loop {
break if self.is_shutdown() {
Ok(())
} else {
let sd = &mut self.channel_info_query_sending;
if sd.is_sending() {
match sd.poll_unpin(cx) {
Ready(Ok(())) => continue,
Ready(Err(_)) => Err(Error::with_msg_no_trace("can not send into channel")),
Pending => Ok(()),
let mut have_progress = false;
for _ in 0..128 {
let sd = &mut self.storage_insert_sender;
if sd.is_idle() {
if let Some(item) = self.insert_item_queue.pop_front() {
self.storage_insert_sender.send(item);
}
}
if self.storage_insert_sender.is_sending() {
match self.storage_insert_sender.poll_unpin(cx) {
Ready(Ok(())) => {
have_progress = true;
}
} else if let Some(item) = self.channel_info_query_queue.pop_front() {
Ready(Err(_)) => return Err(Error::with_msg_no_trace("can not send into channel")),
Pending => return Ok(Pending),
}
}
}
if have_progress {
Ok(Ready(Some(())))
} else {
Ok(Ready(None))
}
}
fn attempt_flush_channel_info_query(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
use Poll::*;
if self.is_shutdown() {
Ok(Ready(None))
} else {
let sd = &mut self.channel_info_query_sending;
if sd.is_idle() {
if let Some(item) = self.channel_info_query_queue.pop_front() {
trace3!("send series query {item:?}");
let sd = &mut self.channel_info_query_sending;
sd.send(item);
continue;
} else {
Ok(())
}
};
}
let sd = &mut self.channel_info_query_sending;
if sd.is_sending() {
match sd.poll_unpin(cx) {
Ready(Ok(())) => Ok(Ready(Some(()))),
Ready(Err(_)) => Err(Error::with_msg_no_trace("can not send into channel")),
Pending => Ok(Pending),
}
} else {
Ok(Ready(None))
}
}
}
}
@@ -1806,63 +1850,130 @@ impl Stream for CaConn {
use Poll::*;
self.stats.caconn_poll_count.inc();
let poll_ts1 = Instant::now();
self.stats.ca_conn_poll_fn_begin().inc();
let ret = loop {
self.stats.ca_conn_poll_loop_begin().inc();
let qlen = self.insert_item_queue.len();
if qlen > self.opts.insert_queue_max / 3 {
if qlen >= self.opts.insert_queue_max * 2 / 3 {
self.stats.insert_item_queue_pressure().inc();
} else if qlen >= self.opts.insert_queue_max {
self.stats.insert_item_queue_full().inc();
}
break if let CaConnState::EndOfStream = self.state {
Ready(None)
} else if let Err(e) = self.as_mut().handle_own_ticker(cx) {
Ready(Some(Err(e)))
} else if let Some(item) = self.cmd_res_queue.pop_front() {
let mut have_pending = false;
let mut have_progress = false;
if let CaConnState::EndOfStream = self.state {
break Ready(None);
}
if let Some(item) = self.cmd_res_queue.pop_front() {
let item = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::ConnCommandResult(item),
};
Ready(Some(Ok(item)))
} else if let Some(item) = self.ca_conn_event_out_queue.pop_front() {
Ready(Some(Ok(item)))
} else if let Some(item) = self.insert_item_queue.pop_front() {
let ev = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::QueryItem(item),
};
Ready(Some(Ok(ev)))
} else if let Err(e) = self.as_mut().attempt_flush_channel_info_query(cx) {
Ready(Some(Err(e)))
} else if let Ready(Some(Err(e))) = self.as_mut().handle_conn_command(cx) {
Ready(Some(Err(e)))
} else {
match self.loop_inner(cx) {
Ok(Ready(Some(()))) => continue,
Ok(Ready(None)) => {
// Ready(_) => self.stats.conn_stream_ready.inc(),
// Pending => self.stats.conn_stream_pending.inc(),
let _item = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::None,
};
if self.is_shutdown() {
if self.queues_async_out_flushed() == false {
debug!("shutdown, but async queues not flushed");
continue;
} else {
debug!("end of stream {}", self.remote_addr_dbg);
self.state = CaConnState::EndOfStream;
Ready(None)
}
} else {
continue;
}
}
Ok(Pending) => Pending,
Err(e) => {
error!("{e}");
self.state = CaConnState::EndOfStream;
break Ready(Some(Ok(item)));
}
if let Some(item) = self.ca_conn_event_out_queue.pop_front() {
break Ready(Some(Ok(item)));
}
// if let Some(item) = self.insert_item_queue.pop_front() {
// let ev = CaConnEvent {
// ts: Instant::now(),
// value: CaConnEventValue::QueryItem(item),
// };
// break Ready(Some(Ok(ev)));
// }
match self.as_mut().handle_own_ticker(cx) {
Ok(Ready(())) => {
have_progress = true;
}
Ok(Pending) => {
have_pending = true;
}
Err(e) => break Ready(Some(Err(e))),
}
match self.as_mut().attempt_flush_storage_queue(cx) {
Ok(Ready(Some(()))) => {
have_progress = true;
}
Ok(Ready(None)) => {}
Ok(Pending) => {
have_pending = true;
}
Err(e) => break Ready(Some(Err(e))),
}
match self.as_mut().attempt_flush_channel_info_query(cx) {
Ok(Ready(Some(()))) => {
have_progress = true;
}
Ok(Ready(None)) => {}
Ok(Pending) => {
have_pending = true;
}
Err(e) => break Ready(Some(Err(e))),
}
match self.as_mut().handle_conn_command(cx) {
Ok(Ready(Some(()))) => {
have_progress = true;
}
Ok(Ready(None)) => {}
Ok(Pending) => {
have_pending = true;
}
Err(e) => break Ready(Some(Err(e))),
}
match self.loop_inner(cx) {
Ok(Ready(Some(()))) => {
have_progress = true;
}
Ok(Ready(None)) => {}
Ok(Pending) => {
have_pending = true;
}
Err(e) => {
error!("{e}");
self.state = CaConnState::EndOfStream;
break Ready(Some(Err(e)));
}
}
break if self.is_shutdown() {
if self.queues_async_out_flushed() {
debug!("end of stream {}", self.remote_addr_dbg);
self.state = CaConnState::EndOfStream;
Ready(None)
} else {
if have_progress {
self.stats.ca_conn_poll_reloop().inc();
continue;
} else if have_pending {
self.stats.ca_conn_poll_pending().inc();
Pending
} else {
// TODO error
error!("logic error");
self.stats.logic_error().inc();
let e = Error::with_msg_no_trace("shutdown, not done, no progress, no pending");
Ready(Some(Err(e)))
}
}
} else {
if have_progress {
self.stats.ca_conn_poll_reloop().inc();
continue;
} else if have_pending {
self.stats.ca_conn_poll_pending().inc();
Pending
} else {
self.stats.ca_conn_poll_no_progress_no_pending().inc();
let e = Error::with_msg_no_trace("no progress no pending");
Ready(Some(Err(e)))
}
};
};
let poll_ts2 = Instant::now();
+37 -23
View File
@@ -15,7 +15,6 @@ use crate::ca::statemap::WithAddressState;
use crate::daemon_common::Channel;
use crate::errconv::ErrConv;
use crate::rt::JoinHandle;
use crate::rt::TokMx;
use crate::senderpolling::SenderPolling;
use crate::throttletrace::ThrottleTrace;
use async_channel::Receiver;
@@ -152,16 +151,34 @@ pub struct ChannelRemove {
name: String,
}
pub struct ChannelStatusRequest {
pub tx: Sender<ChannelStatusResponse>,
}
#[derive(Debug, Clone, Serialize)]
pub struct ChannelStatusesResponse {
pub struct ChannelStatusResponse {
pub channels_ca_conn: BTreeMap<String, ChannelStateInfo>,
pub channels_ca_conn_set: BTreeMap<String, ChannelState>,
}
impl fmt::Debug for ChannelStatusRequest {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("ChannelStatusesRequest").finish()
}
}
pub struct ChannelStatusesRequest {
pub name: String,
pub limit: u64,
pub tx: Sender<ChannelStatusesResponse>,
}
#[derive(Debug, Clone, Serialize)]
pub struct ChannelStatusesResponse {
pub channels_ca_conn: BTreeMap<String, ChannelStateInfo>,
pub channels_ca_conn_set: BTreeMap<String, ChannelState>,
}
impl fmt::Debug for ChannelStatusesRequest {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("ChannelStatusesRequest").finish()
@@ -418,7 +435,7 @@ impl CaConnSet {
fn handle_ca_conn_event(&mut self, addr: SocketAddr, ev: CaConnEvent) -> Result<(), Error> {
match ev.value {
CaConnEventValue::None => Ok(()),
CaConnEventValue::EchoTimeout => todo!(),
CaConnEventValue::EchoTimeout => Ok(()),
CaConnEventValue::ConnCommandResult(x) => self.handle_conn_command_result(addr, x),
CaConnEventValue::QueryItem(item) => {
self.storage_insert_queue.push_back(item);
@@ -645,14 +662,23 @@ impl CaConnSet {
return Ok(());
}
debug!("handle_channel_statuses_req");
let reg1 = regex::Regex::new(&req.name)?;
let channels_ca_conn = self
.ca_conn_channel_states
.iter()
.filter(|x| reg1.is_match(x.0))
.map(|(k, v)| (k.to_string(), v.clone()))
.collect();
let channels_ca_conn_set = self
.channel_states
.inner()
.iter()
.filter(|(k, v)| reg1.is_match(k.id()))
.map(|(k, v)| (k.id().to_string(), v.clone()))
.collect();
let item = ChannelStatusesResponse {
channels_ca_conn: self.ca_conn_channel_states.clone(),
channels_ca_conn_set: self
.channel_states
.inner()
.iter()
.map(|(k, v)| (k.id().to_string(), v.clone()))
.collect(),
channels_ca_conn,
channels_ca_conn_set,
};
if req.tx.try_send(item).is_err() {
self.stats.response_tx_fail.inc();
@@ -737,6 +763,7 @@ impl CaConnSet {
add.backend.clone(),
addr_v4,
add.local_epics_hostname,
self.storage_insert_tx.clone(),
self.channel_info_query_tx.clone(),
self.ca_conn_stats.clone(),
);
@@ -897,19 +924,6 @@ impl CaConnSet {
Ok(())
}
async fn conn_remove(
ca_conn_ress: &TokMx<BTreeMap<SocketAddrV4, CaConnRes>>,
addr: SocketAddrV4,
) -> Result<bool, Error> {
// TODO make this lock-free.
//warn!("Lock for conn_remove");
if let Some(_caconn) = ca_conn_ress.lock().await.remove(&addr) {
Ok(true)
} else {
Ok(false)
}
}
fn check_connection_states(&mut self) -> Result<(), Error> {
let tsnow = Instant::now();
for (addr, val) in &mut self.ca_conn_ress {
View File
+32 -24
View File
@@ -3,7 +3,6 @@ use crate::ca::connset::CaConnSetEvent;
use crate::ca::connset::ChannelStatusesRequest;
use crate::ca::connset::ChannelStatusesResponse;
use crate::ca::connset::ConnSetCmd;
use crate::ca::METRICS;
use crate::daemon_common::DaemonEvent;
use async_channel::Receiver;
use async_channel::Sender;
@@ -124,34 +123,39 @@ async fn channel_remove(params: HashMap<String, String>, dcom: Arc<DaemonComm>)
Json(Value::Bool(false))
}
async fn channel_state(params: HashMap<String, String>, dcom: Arc<DaemonComm>) -> axum::Json<bool> {
let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string();
error!("TODO channel_state");
axum::Json(false)
async fn channel_state(
params: HashMap<String, String>,
tx: Sender<CaConnSetEvent>,
) -> axum::Json<ChannelStatusesResponse> {
panic!("TODO");
}
// axum::Json<ChannelStatusesResponse>
async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSetEvent>) -> String {
async fn channel_states(
params: HashMap<String, String>,
tx: Sender<CaConnSetEvent>,
) -> axum::Json<ChannelStatusesResponse> {
let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string();
let limit = params
.get("limit")
.map(|x| x.parse().ok())
.unwrap_or(None)
.unwrap_or(40);
let (tx2, rx2) = async_channel::bounded(1);
let req = ChannelStatusesRequest { tx: tx2 };
let req = ChannelStatusesRequest { name, limit, tx: tx2 };
let item = CaConnSetEvent::ConnSetCmd(ConnSetCmd::ChannelStatuses(req));
// TODO handle error
tx.send(item).await;
let res = rx2.recv().await.unwrap();
match serde_json::to_string(&res) {
Ok(x) => x,
Err(e) => {
error!("Serialize error {e}");
Err::<(), _>(e).unwrap();
panic!();
}
}
// axum::Json(res)
// match serde_json::to_string(&res) {
// Ok(x) => x,
// Err(e) => {
// error!("Serialize error {e}");
// Err::<(), _>(e).unwrap();
// panic!();
// }
// }
axum::Json(res)
}
async fn extra_inserts_conf_set(v: ExtraInsertsConf, dcom: Arc<DaemonComm>) -> axum::Json<bool> {
@@ -229,14 +233,15 @@ fn make_routes(dcom: Arc<DaemonComm>, connset_cmd_tx: Sender<CaConnSetEvent>, st
.route(
"/daqingest/channel/state",
get({
let dcom = dcom.clone();
|Query(params): Query<HashMap<String, String>>| channel_state(params, dcom)
// let dcom = dcom.clone();
let tx = connset_cmd_tx.clone();
|Query(params): Query<HashMap<String, String>>| channel_state(params, tx)
}),
)
.route(
"/daqingest/channel/states",
get({
let dcom = dcom.clone();
// let dcom = dcom.clone();
let tx = connset_cmd_tx.clone();
|Query(params): Query<HashMap<String, String>>| channel_states(params, tx)
}),
@@ -349,11 +354,14 @@ pub async fn metrics_agg_task(
let nitems = query_item_chn.upgrade().map_or(0, |x| x.len());
agg.store_worker_recv_queue_len.__set(nitems as u64);
}
let mut m = METRICS.lock().unwrap();
*m = Some(agg.clone());
if false {
let diff = CaConnStatsAggDiff::diff_from(&agg_last, &agg);
info!("{}", diff.display());
#[cfg(DISABLED)]
{
let mut m = METRICS.lock().unwrap();
*m = Some(agg.clone());
if false {
let diff = CaConnStatsAggDiff::diff_from(&agg_last, &agg);
info!("{}", diff.display());
}
}
agg_last = agg;
}
-1
View File
@@ -1,4 +1,3 @@
use taskrun::tokio;
pub use tokio::sync::Mutex as TokMx;
pub use tokio::task::JoinHandle;
pub use tokio::time::sleep;
+39 -21
View File
@@ -189,46 +189,64 @@ async fn worker(
}
}
QueryItem::Insert(item) => {
let item_ts_local = item.ts_local.clone();
let tsnow = {
let ts = SystemTime::now();
let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap();
epoch.as_secs() * SEC + epoch.subsec_nanos() as u64
};
let dt = (tsnow / 1000) as i64 - (item.ts_local / 1000) as i64;
let dt = (tsnow / 1000000) as i64 - (item_ts_local / 1000000) as i64;
if dt < 0 {
stats.item_latency_neg().inc();
} else if dt <= 1000 * 25 {
} else if dt <= 25 {
stats.item_latency_025ms().inc();
} else if dt <= 1000 * 50 {
} else if dt <= 50 {
stats.item_latency_050ms().inc();
} else if dt <= 1000 * 100 {
} else if dt <= 100 {
stats.item_latency_100ms().inc();
} else if dt <= 1000 * 200 {
} else if dt <= 200 {
stats.item_latency_200ms().inc();
} else if dt <= 1000 * 400 {
} else if dt <= 400 {
stats.item_latency_400ms().inc();
} else if dt <= 1000 * 800 {
} else if dt <= 800 {
stats.item_latency_800ms().inc();
} else if dt <= 1600 {
stats.item_latency_1600ms().inc();
} else if dt <= 3200 {
stats.item_latency_3200ms().inc();
} else {
stats.item_latency_large().inc();
}
if false {
stats.inserted_values().inc();
} else {
let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire);
let do_insert = i1 % 1000 < insert_frac;
match insert_item(item, ttls.index, ttls.d0, ttls.d1, &data_store, &stats, do_insert).await {
Ok(_) => {
stats.inserted_values().inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &e);
back_off_sleep(&mut backoff).await;
let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire);
let do_insert = i1 % 1000 < insert_frac;
match insert_item(item, ttls.index, ttls.d0, ttls.d1, &data_store, &stats, do_insert).await {
Ok(_) => {
stats.inserted_values().inc();
let tsnow = {
let ts = SystemTime::now();
let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap();
epoch.as_secs() * SEC + epoch.subsec_nanos() as u64
};
let dt = (tsnow / 1000000) as i64 - (item_ts_local / 1000000) as i64;
if dt <= 50 {
stats.item_commit_latency_0050ms().inc();
} else if dt <= 200 {
stats.item_commit_latency_0200ms().inc();
} else if dt <= 800 {
stats.item_commit_latency_0800ms().inc();
} else if dt <= 3200 {
stats.item_commit_latency_3200ms().inc();
} else {
stats.item_commit_latency_large().inc();
}
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &e);
back_off_sleep(&mut backoff).await;
}
i1 += 1;
}
i1 += 1;
}
QueryItem::Mute(item) => {
let values = (
+15 -107
View File
@@ -1,25 +1,17 @@
pub use netpod::CONNECTION_STATUS_DIV;
use crate::store::DataStore;
use async_channel::Receiver;
use async_channel::Sender;
use err::thiserror;
use err::ThisError;
use log::*;
use netpod::ScalarType;
use netpod::Shape;
use scylla::prepared_statement::PreparedStatement;
use scylla::transport::errors::DbError;
use scylla::transport::errors::QueryError;
use series::SeriesId;
use stats::CaConnStats;
use stats::InsertWorkerStats;
use std::net::SocketAddrV4;
use std::sync::atomic;
use std::sync::atomic::AtomicU64;
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
#[derive(Debug, ThisError)]
@@ -244,83 +236,6 @@ pub enum QueryItem {
TimeBinPatchSimpleF32(TimeBinPatchSimpleF32),
}
pub struct CommonInsertItemQueueSender {
sender: Sender<QueryItem>,
}
impl CommonInsertItemQueueSender {
#[inline(always)]
pub fn send(&self, k: QueryItem) -> async_channel::Send<QueryItem> {
self.sender.send(k)
}
#[inline(always)]
pub fn is_full(&self) -> bool {
self.sender.is_full()
}
pub fn inner(&self) -> &Sender<QueryItem> {
&self.sender
}
}
pub struct CommonInsertItemQueue {
sender: Mutex<Option<Sender<QueryItem>>>,
recv: Receiver<QueryItem>,
}
impl CommonInsertItemQueue {
pub fn new(cap: usize) -> Self {
let (tx, rx) = async_channel::bounded(cap);
Self {
sender: Mutex::new(Some(tx)),
recv: rx,
}
}
pub fn from_tx_rx(tx: Sender<QueryItem>, rx: Receiver<QueryItem>) -> Self {
Self {
sender: Mutex::new(Some(tx)),
recv: rx,
}
}
pub fn sender(&self) -> Option<CommonInsertItemQueueSender> {
match self.sender.lock().unwrap().as_ref() {
Some(sender) => {
let ret = CommonInsertItemQueueSender { sender: sender.clone() };
Some(ret)
}
None => None,
}
}
pub fn receiver(&self) -> Option<Receiver<QueryItem>> {
let ret = self.recv.clone();
Some(ret)
}
pub fn sender_count(&self) -> Option<usize> {
self.sender.lock().unwrap().as_ref().map(|x| x.sender_count())
}
pub fn sender_count_2(&self) -> usize {
self.recv.sender_count()
}
pub fn receiver_count(&self) -> usize {
self.recv.receiver_count()
}
pub fn close(&self) {
self.sender.lock().unwrap().as_ref().map(|x| x.close());
}
pub fn drop_sender(&self) {
self.sender.lock().unwrap().take();
}
}
struct InsParCom {
series: u64,
ts_msp: u64,
@@ -384,15 +299,24 @@ where
val,
par.ttl as i32,
);
data_store.scy.execute(qu, params).await?;
Ok(())
let y = data_store.scy.execute(qu, params).await;
match y {
Ok(_) => Ok(()),
Err(e) => match e {
QueryError::TimeoutError => Err(Error::DbTimeout),
// TODO use `msg`
QueryError::DbError(e, _msg) => match e {
DbError::Overloaded => Err(Error::DbOverload),
_ => Err(e.into()),
},
_ => Err(e.into()),
},
}
} else {
Ok(())
}
}
static warn_last: AtomicU64 = AtomicU64::new(0);
pub async fn insert_item(
item: InsertItem,
ttl_index: Duration,
@@ -441,24 +365,8 @@ pub async fn insert_item(
I32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i32, &data_store).await?,
F32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f32, &data_store).await?,
F64(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f64, &data_store).await?,
String(val) => {
let ts = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_or(0, |x| x.as_secs());
if ts > warn_last.load(atomic::Ordering::Acquire) + 10 {
warn_last.store(ts, atomic::Ordering::Release);
warn!("TODO string insert {val}");
}
}
Bool(val) => {
let ts = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_or(0, |x| x.as_secs());
if ts > warn_last.load(atomic::Ordering::Acquire) + 10 {
warn_last.store(ts, atomic::Ordering::Release);
warn!("TODO bool insert {val}");
}
}
String(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_string, &data_store).await?,
Bool(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_bool, &data_store).await?,
}
}
Array(val) => {
+53 -20
View File
@@ -207,13 +207,40 @@ impl GenTwcsTab {
}
}
fn table_param_compaction(compaction_window_size: Duration) -> String {
table_param_compaction_twcs(compaction_window_size)
}
#[allow(unused)]
fn table_param_compaction_stcs() -> String {
format!(concat!(
"{{ 'class': 'SizeTieredCompactionStrategy'",
// ", 'min_sstable_size': 200",
// ", 'max_threshold': 10",
" }}"
))
}
#[allow(unused)]
fn table_param_compaction_twcs(compaction_window_size: Duration) -> String {
format!(
concat!(
"{{ 'class': 'TimeWindowCompactionStrategy'",
", 'compaction_window_unit': 'HOURS'",
", 'compaction_window_size': {}",
" }}"
),
compaction_window_size.as_secs() / 60 / 60
)
}
struct EvTabDim0 {
sty: String,
cqlsty: String,
// SCYLLA_TTL_EVENTS_DIM0
default_time_to_live: usize,
default_time_to_live: Duration,
// TWCS_WINDOW_0D
compaction_window_size: usize,
compaction_window_size: Duration,
}
impl EvTabDim0 {
@@ -223,14 +250,13 @@ impl EvTabDim0 {
fn cql_create(&self) -> String {
use std::fmt::Write;
let ttl = self.default_time_to_live.as_secs();
let compaction = table_param_compaction(self.compaction_window_size);
let mut s = String::new();
write!(s, "create table {}", self.name()).unwrap();
write!(s, " (series bigint, ts_msp bigint, ts_lsp bigint, pulse bigint, value {}, primary key ((series, ts_msp), ts_lsp))", self.cqlsty).unwrap();
write!(s, " with default_time_to_live = {}", self.default_time_to_live).unwrap();
s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'HOURS'")
.unwrap();
write!(s, ", 'compaction_window_size': {}", self.compaction_window_size).unwrap();
s.write_str(" }").unwrap();
write!(s, " with default_time_to_live = {}", ttl).unwrap();
write!(s, " and compaction = {}", compaction).unwrap();
s
}
}
@@ -239,9 +265,9 @@ struct EvTabDim1 {
sty: String,
cqlsty: String,
// SCYLLA_TTL_EVENTS_DIM1
default_time_to_live: usize,
default_time_to_live: Duration,
// TWCS_WINDOW_1D
compaction_window_size: usize,
compaction_window_size: Duration,
}
impl EvTabDim1 {
@@ -252,13 +278,12 @@ impl EvTabDim1 {
fn cql(&self) -> String {
use std::fmt::Write;
let mut s = String::new();
let ttl = self.default_time_to_live.as_secs();
let compaction = table_param_compaction(self.compaction_window_size);
write!(s, "create table {}", self.name()).unwrap();
write!(s, " (series bigint, ts_msp bigint, ts_lsp bigint, pulse bigint, value {}, primary key ((series, ts_msp), ts_lsp))", self.cqlsty).unwrap();
write!(s, " with default_time_to_live = {}", self.default_time_to_live).unwrap();
s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'HOURS'")
.unwrap();
write!(s, ", 'compaction_window_size': {}", self.compaction_window_size).unwrap();
s.write_str(" }").unwrap();
write!(s, " with default_time_to_live = {}", ttl).unwrap();
write!(s, " and compaction = {}", compaction).unwrap();
s
}
}
@@ -294,8 +319,8 @@ async fn check_event_tables(scy: &ScySession) -> Result<(), Error> {
sty: sty.into(),
cqlsty: cqlsty.into(),
// ttl is set in actual data inserts
default_time_to_live: 60 * 60 * 1,
compaction_window_size: 48,
default_time_to_live: dhours(1),
compaction_window_size: dhours(48),
};
if !has_table(&desc.name(), scy).await? {
scy.query(desc.cql_create(), ()).await?;
@@ -304,8 +329,8 @@ async fn check_event_tables(scy: &ScySession) -> Result<(), Error> {
sty: sty.into(),
cqlsty: format!("frozen<list<{}>>", cqlsty),
// ttl is set in actual data inserts
default_time_to_live: 60 * 60 * 1,
compaction_window_size: 12,
default_time_to_live: dhours(1),
compaction_window_size: dhours(12),
};
if !check_table_readable(&desc.name(), scy).await? {
scy.query(desc.cql(), ()).await?;
@@ -319,8 +344,16 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaConfig) -> Result<(), Er
let scy = &scy2;
if !has_keyspace(&scyconf.keyspace, scy).await? {
let rf = 2;
let cql = format!("create keyspace {} with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }} and durable_writes = true;", scyconf.keyspace, rf);
let replication = 2;
let durable = false;
let cql = format!(
concat!(
"create keyspace {}",
" with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }}",
" and durable_writes = {};"
),
scyconf.keyspace, replication, durable
);
scy.query_iter(cql, ()).await?;
info!("keyspace created");
}
+21
View File
@@ -22,12 +22,15 @@ pub struct DataStore {
pub qu_insert_scalar_i8: Arc<PreparedStatement>,
pub qu_insert_scalar_i16: Arc<PreparedStatement>,
pub qu_insert_scalar_i32: Arc<PreparedStatement>,
pub qu_insert_scalar_i64: Arc<PreparedStatement>,
pub qu_insert_scalar_f32: Arc<PreparedStatement>,
pub qu_insert_scalar_f64: Arc<PreparedStatement>,
pub qu_insert_scalar_bool: Arc<PreparedStatement>,
pub qu_insert_scalar_string: Arc<PreparedStatement>,
pub qu_insert_array_i8: Arc<PreparedStatement>,
pub qu_insert_array_i16: Arc<PreparedStatement>,
pub qu_insert_array_i32: Arc<PreparedStatement>,
pub qu_insert_array_i64: Arc<PreparedStatement>,
pub qu_insert_array_f32: Arc<PreparedStatement>,
pub qu_insert_array_f64: Arc<PreparedStatement>,
pub qu_insert_array_bool: Arc<PreparedStatement>,
@@ -80,6 +83,11 @@ impl DataStore {
let q = scy.prepare(cql).await?;
let qu_insert_scalar_i32 = Arc::new(q);
let cql =
"insert into events_scalar_i64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
let qu_insert_scalar_i64 = Arc::new(q);
let cql =
"insert into events_scalar_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
@@ -90,6 +98,11 @@ impl DataStore {
let q = scy.prepare(cql).await?;
let qu_insert_scalar_f64 = Arc::new(q);
let cql =
"insert into events_scalar_bool (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
let qu_insert_scalar_bool = Arc::new(q);
let cql="insert into events_scalar_string (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
let qu_insert_scalar_string = Arc::new(q);
@@ -110,6 +123,11 @@ impl DataStore {
let q = scy.prepare(cql).await?;
let qu_insert_array_i32 = Arc::new(q);
let cql =
"insert into events_array_i64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
let qu_insert_array_i64 = Arc::new(q);
let cql =
"insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
@@ -169,12 +187,15 @@ impl DataStore {
qu_insert_scalar_i8,
qu_insert_scalar_i16,
qu_insert_scalar_i32,
qu_insert_scalar_i64,
qu_insert_scalar_f32,
qu_insert_scalar_f64,
qu_insert_scalar_bool,
qu_insert_scalar_string,
qu_insert_array_i8,
qu_insert_array_i16,
qu_insert_array_i32,
qu_insert_array_i64,
qu_insert_array_f32,
qu_insert_array_f64,
qu_insert_array_bool,
+14
View File
@@ -283,7 +283,14 @@ stats_proc::stats_struct!((
item_latency_200ms,
item_latency_400ms,
item_latency_800ms,
item_latency_1600ms,
item_latency_3200ms,
item_latency_large,
item_commit_latency_0050ms,
item_commit_latency_0200ms,
item_commit_latency_0800ms,
item_commit_latency_3200ms,
item_commit_latency_large,
worker_start,
worker_finish,
)
@@ -303,7 +310,9 @@ stats_proc::stats_struct!((
inserts_queue_push,
inserts_queue_drop,
insert_item_queue_pressure,
insert_item_queue_full,
channel_fast_item_drop,
logic_error,
store_worker_recv_queue_len,
// TODO maybe rename: this is now only the recv of the intermediate queue:
store_worker_item_recv,
@@ -355,6 +364,11 @@ stats_proc::stats_struct!((
pong_recv_400ms,
pong_recv_slow,
pong_timeout,
ca_conn_poll_fn_begin,
ca_conn_poll_loop_begin,
ca_conn_poll_reloop,
ca_conn_poll_pending,
ca_conn_poll_no_progress_no_pending,
),
values(inter_ivl_ema)
),