Profiled and tuned throughput

This commit is contained in:
Dominik Werder
2022-06-01 17:05:03 +02:00
parent f5f1b23466
commit 8439d31b4c
8 changed files with 302 additions and 282 deletions

View File

@@ -1,16 +1,17 @@
[build]
rustflags = [
#"-C", "target-cpu=native",
"-C", "target-cpu=sandybridge",
"-C", "force-frame-pointers=yes",
"-C", "force-unwind-tables=yes",
#"-C", "force-frame-pointers=yes",
#"-C", "force-unwind-tables=yes",
#"-C", "relocation-model=static",
#"-C", "embed-bitcode=no",
#"-C", "inline-threshold=1000",
#"-Z", "time-passes=yes",
#"-Z", "time-llvm-passes=yes",
"--cfg", "tokio_unstable",
#"--cfg", "tokio_unstable",
]
rustdocflags = [
"--cfg", "tokio_unstable"
#"--cfg", "tokio_unstable"
]

View File

@@ -2,13 +2,13 @@
members = ["log", "netfetch", "daqingest"]
[profile.release]
opt-level = 2
debug = 2
opt-level = 3
debug = 0
overflow-checks = false
debug-assertions = false
lto = "thin"
codegen-units = 32
incremental = true
#codegen-units = 32
incremental = false
[patch.crates-io]
#tokio = { git = "https://github.com/dominikwerder/tokio", rev = "995221d8" }

View File

@@ -4,7 +4,7 @@ pub mod store;
use self::conn::FindIocStream;
use self::store::DataStore;
use crate::store::{CommonInsertItemQueue, CommonInsertQueue};
use crate::store::CommonInsertItemQueue;
use conn::CaConn;
use err::Error;
use futures_util::StreamExt;
@@ -12,15 +12,17 @@ use log::*;
use netpod::Database;
use scylla::batch::Consistency;
use serde::{Deserialize, Serialize};
use stats::{CaConnStatsAgg, CaConnStatsAggDiff};
use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff};
use std::collections::BTreeMap;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, Once};
use std::time::{Duration, Instant};
use tokio::fs::OpenOptions;
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
use tokio_postgres::Client as PgClient;
static mut METRICS: Option<Mutex<Option<CaConnStatsAgg>>> = None;
static METRICS_ONCE: Once = Once::new();
@@ -53,6 +55,7 @@ struct ChannelConfig {
insert_scylla_sessions: Option<usize>,
insert_queue_max: Option<usize>,
insert_item_queue_cap: Option<usize>,
api_bind: Option<String>,
}
pub struct ListenFromFileOpts {
@@ -86,15 +89,15 @@ pub async fn parse_config(config: PathBuf) -> Result<CaConnectOpts, Error> {
search: conf.search,
addr_bind: conf.addr_bind,
addr_conn: conf.addr_conn,
max_simul: conf.max_simul.unwrap_or(113),
timeout: conf.timeout.unwrap_or(2000),
abort_after_search: conf.abort_after_search,
pg_pass: conf.pg_pass,
array_truncate: conf.array_truncate.unwrap_or(512),
insert_worker_count: conf.insert_worker_count.unwrap_or(1),
insert_worker_count: conf.insert_worker_count.unwrap_or(8),
insert_scylla_sessions: conf.insert_scylla_sessions.unwrap_or(1),
insert_queue_max: conf.insert_queue_max.unwrap_or(16),
insert_item_queue_cap: conf.insert_item_queue_cap.unwrap_or(256),
insert_queue_max: conf.insert_queue_max.unwrap_or(32),
insert_item_queue_cap: conf.insert_item_queue_cap.unwrap_or(380000),
api_bind: conf.api_bind.unwrap_or_else(|| "0.0.0.0:3011".into()),
})
}
@@ -103,7 +106,6 @@ pub struct CaConnectOpts {
pub search: Vec<String>,
pub addr_bind: Ipv4Addr,
pub addr_conn: Ipv4Addr,
pub max_simul: usize,
pub timeout: u64,
pub abort_after_search: u32,
pub pg_pass: String,
@@ -112,6 +114,7 @@ pub struct CaConnectOpts {
pub insert_scylla_sessions: usize,
pub insert_queue_max: usize,
pub insert_item_queue_cap: usize,
pub api_bind: String,
}
async fn resolve_address(addr_str: &str) -> Result<SocketAddrV4, Error> {
@@ -242,14 +245,75 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
Ok(())
}
async fn spawn_scylla_insert_workers(
insert_scylla_sessions: usize,
insert_worker_count: usize,
insert_item_queue: &CommonInsertItemQueue,
insert_frac: Arc<AtomicU64>,
pg_client: Arc<PgClient>,
store_stats: Arc<stats::CaConnStats>,
) -> Result<(), Error> {
let mut data_stores = vec![];
for _ in 0..insert_scylla_sessions {
let scy = scylla::SessionBuilder::new()
.known_node("sf-nube-14:19042")
.default_consistency(Consistency::One)
.use_keyspace("ks1", true)
.build()
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let scy = Arc::new(scy);
let data_store = Arc::new(DataStore::new(pg_client.clone(), scy.clone()).await?);
data_stores.push(data_store);
}
for i1 in 0..insert_worker_count {
let data_store = data_stores[i1 * data_stores.len() / insert_worker_count].clone();
let stats = store_stats.clone();
let recv = insert_item_queue.receiver();
let insert_frac = insert_frac.clone();
let fut = async move {
let mut i1 = 0;
while let Ok(item) = recv.recv().await {
stats.store_worker_item_recv_inc();
let insert_frac = insert_frac.load(Ordering::Acquire);
if i1 % 1000 < insert_frac {
match crate::store::insert_item(item, &data_store, &stats).await {
Ok(_) => {
stats.store_worker_item_insert_inc();
}
Err(e) => {
stats.store_worker_item_error_inc();
// TODO introduce more structured error variants.
if e.msg().contains("WriteTimeout") {
tokio::time::sleep(Duration::from_millis(100)).await;
} else {
// TODO back off but continue.
error!("insert worker sees error: {e:?}");
break;
}
}
}
} else {
stats.store_worker_item_drop_inc();
}
i1 += 1;
}
};
tokio::spawn(fut);
}
Ok(())
}
pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
tokio::spawn(start_metrics_service());
// TODO maybe this should hold the resources needed by the futures?
let ciq = CommonInsertQueue::new();
let facility = "scylla";
let insert_frac = Arc::new(AtomicU64::new(1000));
let insert_ivl_min = Arc::new(AtomicU64::new(8800));
let opts = parse_config(opts.config).await?;
tokio::spawn(start_metrics_service(
opts.api_bind.clone(),
insert_frac.clone(),
insert_ivl_min.clone(),
));
let d = Database {
name: "daqbuffer".into(),
host: "sf-nube-11".into(),
@@ -275,6 +339,10 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let scy = Arc::new(scy);
// TODO use new struct:
let local_stats = Arc::new(CaConnStats::new());
// TODO factor the find loop into a separate Stream.
info!("FIND IOCS");
let qu_find_addr = pg_client
.prepare("select t2.channel, t2.addr from ioc_by_channel t1, ioc_by_channel t2 where t2.facility = t1.facility and t2.channel = t1.channel and t1.facility = $1 and t1.channel in ($2, $3, $4, $5, $6, $7, $8, $9)")
@@ -309,7 +377,10 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
// TODO the address was searched before but could not be found.
} else {
let addr: SocketAddrV4 = match addr.parse() {
Ok(k) => k,
Ok(k) => {
local_stats.ioc_lookup_inc();
k
}
Err(e) => {
error!("can not parse {addr:?} for channel {ch:?} {e:?}");
continue;
@@ -331,60 +402,45 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
if opts.abort_after_search == 1 {
return Ok(());
}
let data_store = Arc::new(DataStore::new(pg_client.clone(), scy.clone(), ciq.sender()).await?);
let data_store = Arc::new(DataStore::new(pg_client.clone(), scy.clone()).await?);
let insert_item_queue = CommonInsertItemQueue::new(opts.insert_item_queue_cap);
// TODO use a new stats struct
let store_stats = Arc::new(stats::CaConnStats::new());
let store_stats = Arc::new(CaConnStats::new());
spawn_scylla_insert_workers(
opts.insert_scylla_sessions,
opts.insert_worker_count,
&insert_item_queue,
insert_frac.clone(),
pg_client.clone(),
store_stats.clone(),
)
.await?;
let mut data_stores = vec![];
for _ in 0..opts.insert_scylla_sessions {
let scy = scylla::SessionBuilder::new()
.known_node("sf-nube-14:19042")
.default_consistency(Consistency::One)
.use_keyspace("ks1", true)
.build()
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let scy = Arc::new(scy);
let data_store = Arc::new(DataStore::new(pg_client.clone(), scy.clone(), ciq.sender()).await?);
data_stores.push(data_store);
}
for i1 in 0..opts.insert_worker_count {
let data_store = data_stores[i1 * data_stores.len() / opts.insert_worker_count].clone();
let stats = store_stats.clone();
let recv = insert_item_queue.receiver();
let fut = async move {
while let Ok(item) = recv.recv().await {
stats.store_worker_item_recv_inc();
match crate::store::insert_item(item, &data_store, &stats).await {
Ok(_) => {}
Err(e) => {
// TODO back off but continue.
error!("insert worker sees error: {e:?}");
break;
}
}
}
};
tokio::spawn(fut);
}
let mut conn_jhs = vec![];
let mut conn_stats = vec![];
info!("channels_by_host len {}", channels_by_host.len());
for (host, channels) in channels_by_host {
if false && host.ip() != &"172.26.24.76".parse::<Ipv4Addr>().unwrap() {
continue;
}
let data_store = data_store.clone();
debug!("Create TCP connection to {:?}", (host.ip(), host.port()));
//debug!("Create TCP connection to {:?}", (host.ip(), host.port()));
let addr = SocketAddrV4::new(host.ip().clone(), host.port());
let tcp = match TcpStream::connect(addr).await {
Ok(k) => k,
// TODO establish the connection in the future SM.
let tcp = match tokio::time::timeout(Duration::from_millis(500), TcpStream::connect(addr)).await {
Ok(Ok(k)) => k,
Ok(Err(e)) => {
error!("Can not connect to {addr:?} {e:?}");
continue;
}
Err(e) => {
error!("Can not connect to {addr:?} {e:?}");
continue;
}
};
local_stats.tcp_connected_inc();
let mut conn = CaConn::new(
tcp,
addr,
@@ -392,15 +448,18 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
insert_item_queue.sender(),
opts.array_truncate,
opts.insert_queue_max,
insert_ivl_min.clone(),
);
conn_stats.push(conn.stats());
for c in channels {
conn.channel_add(c);
}
let stats2 = conn.stats();
let conn_block = async move {
while let Some(item) = conn.next().await {
match item {
Ok(_) => {
stats2.conn_item_count_inc();
// TODO test if performance can be noticed:
//trace!("CaConn gives item: {k:?}");
}
@@ -417,8 +476,9 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
}
let mut agg_last = CaConnStatsAgg::new();
loop {
tokio::time::sleep(Duration::from_millis(500)).await;
tokio::time::sleep(Duration::from_millis(671)).await;
let agg = CaConnStatsAgg::new();
agg.push(&local_stats);
agg.push(&store_stats);
for g in &conn_stats {
agg.push(&g);
@@ -429,6 +489,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
let diff = CaConnStatsAggDiff::diff_from(&agg_last, &agg);
info!("{}", diff.display());
}
for _s1 in &conn_stats {}
agg_last = agg;
if false {
break;
@@ -450,24 +511,37 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
Ok(())
}
async fn start_metrics_service() {
let app = axum::Router::new().route(
"/metrics",
axum::routing::get(|| async {
let stats = get_metrics();
match stats {
Some(s) => {
trace!("Metrics");
s.prometheus()
async fn start_metrics_service(bind_to: String, insert_frac: Arc<AtomicU64>, insert_ivl_min: Arc<AtomicU64>) {
let app = axum::Router::new()
.route(
"/metrics",
axum::routing::get(|| async {
let stats = get_metrics();
match stats {
Some(s) => {
trace!("Metrics");
s.prometheus()
}
None => {
trace!("Metrics empty");
String::new()
}
}
None => {
trace!("Metrics empty");
String::new()
}
}
}),
);
axum::Server::bind(&"0.0.0.0:3011".parse().unwrap())
}),
)
.route(
"/insert_frac",
axum::routing::put(|v: axum::extract::Json<u64>| async move {
insert_frac.store(v.0, Ordering::Release);
}),
)
.route(
"/insert_ivl_min",
axum::routing::put(|v: axum::extract::Json<u64>| async move {
insert_ivl_min.store(v.0, Ordering::Release);
}),
);
axum::Server::bind(&bind_to.parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap()

View File

@@ -15,7 +15,7 @@ use stats::{CaConnStats, IntervalEma};
use std::collections::{BTreeMap, VecDeque};
use std::net::{Ipv4Addr, SocketAddrV4};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant, SystemTime};
@@ -47,18 +47,24 @@ enum MonitoringState {
Muted,
}
#[allow(unused)]
#[derive(Debug)]
struct CreatedState {
#[allow(unused)]
cid: u32,
#[allow(unused)]
sid: u32,
scalar_type: ScalarType,
shape: Shape,
#[allow(unused)]
ts_created: Instant,
state: MonitoringState,
ts_msp_last: u64,
ts_msp_grid_last: u32,
inserted_in_ts_msp: u64,
ivl_ema: IntervalEma,
insert_item_ivl_ema: IntervalEma,
insert_next_earliest: Instant,
#[allow(unused)]
fast_warn_count: u32,
}
#[allow(unused)]
@@ -116,6 +122,7 @@ pub struct CaConn {
remote_addr_dbg: SocketAddrV4,
stats: Arc<CaConnStats>,
insert_queue_max: usize,
insert_ivl_min: Arc<AtomicU64>,
}
impl CaConn {
@@ -126,6 +133,7 @@ impl CaConn {
insert_item_sender: CommonInsertItemQueueSender,
array_truncate: usize,
insert_queue_max: usize,
insert_ivl_min: Arc<AtomicU64>,
) -> Self {
Self {
state: CaConnState::Init,
@@ -148,6 +156,7 @@ impl CaConn {
remote_addr_dbg,
stats: Arc::new(CaConnStats::new()),
insert_queue_max,
insert_ivl_min,
}
}
@@ -191,13 +200,28 @@ impl CaConn {
self.insert_item_send_fut = None;
}
Ready(Err(_)) => break Ready(Err(Error::with_msg_no_trace(format!("can not send the item")))),
Pending => break Pending,
Pending => {
if false {
// Drop the item and continue.
// TODO this causes performance degradation in the channel.
self.stats.inserts_queue_drop_inc();
self.insert_item_send_fut = None;
} else {
// Wait until global queue is ready (peer will see network pressure)
break Pending;
}
}
},
None => {}
}
if let Some(item) = self.insert_item_queue.pop_front() {
self.stats.inserts_queue_pop_for_global_inc();
let sender = unsafe { &*(&self.insert_item_sender as *const CommonInsertItemQueueSender) };
self.insert_item_send_fut = Some(sender.send(item));
if sender.is_full() {
self.stats.inserts_queue_drop_inc();
} else {
self.insert_item_send_fut = Some(sender.send(item));
}
} else {
break Ready(Ok(()));
}
@@ -209,7 +233,7 @@ impl CaConn {
while self.fut_get_series.len() > 0 {
match self.fut_get_series.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
//info!("Have SeriesId {k:?}");
self.stats.get_series_id_ok_inc();
let cid = k.0;
let sid = k.1;
let data_type = k.2;
@@ -244,8 +268,11 @@ impl CaConn {
ts_created: Instant::now(),
state: MonitoringState::AddingEvent(series),
ts_msp_last: 0,
ts_msp_grid_last: 0,
inserted_in_ts_msp: u64::MAX,
ivl_ema: IntervalEma::new(),
insert_item_ivl_ema: IntervalEma::new(),
insert_next_earliest: Instant::now(),
fast_warn_count: 0,
});
let scalar_type = ScalarType::from_ca_id(data_type)?;
let shape = Shape::from_ca_count(data_count)?;
@@ -274,17 +301,15 @@ impl CaConn {
series: SeriesId,
scalar_type: ScalarType,
shape: Shape,
ts: u64,
ev: proto::EventAddRes,
cid: u32,
ts_msp_last: u64,
inserted_in_ts_msp: u64,
ts_msp_grid: Option<u32>,
) -> Result<(), Error> {
// TODO where to actually get the timestamp of the event from?
let ts = SystemTime::now();
let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap();
// TODO decide on better msp/lsp: random offset!
// As long as one writer is active, the msp is arbitrary.
let ts = epoch.as_secs() * SEC + epoch.subsec_nanos() as u64;
let ts_msp = if inserted_in_ts_msp > 2000 {
let ts_msp = ts / (60 * SEC) * (60 * SEC);
if let ChannelState::Created(st) = self.channels.get_mut(&cid).unwrap() {
@@ -324,6 +349,7 @@ impl CaConn {
scalar_type,
shape,
val: ev.value,
ts_msp_grid,
};
item_queue.push_back(item);
self.stats.insert_item_create_inc();
@@ -372,9 +398,44 @@ impl CaConn {
return Err(format!("no series id on insert").into());
}
};
let ts_msp_last = st.ts_msp_last;
let inserted_in_ts_msp = st.inserted_in_ts_msp;
self.event_add_insert(series, scalar_type, shape, ev, cid, ts_msp_last, inserted_in_ts_msp)?;
let tsnow = Instant::now();
if tsnow >= st.insert_next_earliest {
st.insert_item_ivl_ema.tick(tsnow);
let em = st.insert_item_ivl_ema.ema();
let ema = em.ema();
let mm = self.insert_ivl_min.load(Ordering::Acquire);
let mm = (mm as f32) * 1e-6;
let dt = (mm - ema) / em.k();
st.insert_next_earliest = tsnow
.checked_add(Duration::from_micros((dt * 1e6) as u64))
.ok_or_else(|| Error::with_msg_no_trace("time overflow in next insert"))?;
let ts_msp_last = st.ts_msp_last;
let inserted_in_ts_msp = st.inserted_in_ts_msp;
// TODO get event timestamp from channel access field
let ts = SystemTime::now();
let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap();
let ts = epoch.as_secs() * SEC + epoch.subsec_nanos() as u64;
let ts_msp_grid = (ts / (SEC * 10 * 6 * 2)) as u32 * (6 * 2);
let ts_msp_grid = if st.ts_msp_grid_last != ts_msp_grid {
st.ts_msp_grid_last = ts_msp_grid;
Some(ts_msp_grid)
} else {
None
};
self.event_add_insert(
series,
scalar_type,
shape,
ts,
ev,
cid,
ts_msp_last,
inserted_in_ts_msp,
ts_msp_grid,
)?;
} else {
self.stats.channel_fast_item_drop_inc();
}
}
_ => {
error!("unexpected state: EventAddRes while having {ch_s:?}");
@@ -522,8 +583,11 @@ impl CaConn {
ts_created: Instant::now(),
state: MonitoringState::FetchSeriesId,
ts_msp_last: 0,
ts_msp_grid_last: 0,
inserted_in_ts_msp: u64::MAX,
ivl_ema: IntervalEma::new(),
insert_item_ivl_ema: IntervalEma::new(),
insert_next_earliest: Instant::now(),
fast_warn_count: 0,
});
// TODO handle error in different way. Should most likely not abort.
let cd = ChannelDescDecoded {
@@ -673,6 +737,10 @@ impl Stream for CaConn {
let ts_outer_2 = Instant::now();
self.stats.poll_time_all_dur(ts_outer_2.duration_since(ts_outer_1));
// TODO currently, this will never stop by itself
match &ret {
Ready(_) => self.stats.conn_stream_ready_inc(),
Pending => self.stats.conn_stream_pending_inc(),
}
ret
}
}

View File

@@ -1,6 +1,5 @@
use crate::bsread::ChannelDescDecoded;
use crate::series::{Existence, SeriesId};
use crate::store::CommonInsertQueueSender;
use async_channel::{Receiver, Sender};
use err::Error;
use scylla::prepared_statement::PreparedStatement;
@@ -25,15 +24,13 @@ pub struct RegisterChannel {
rx: Receiver<RegisterJob>,
}
#[allow(unused)]
pub struct ChannelRegistry {
scy: Arc<ScySession>,
pg_client: Arc<PgClient>,
}
impl ChannelRegistry {
pub fn new(pg_client: Arc<PgClient>, scy: Arc<ScySession>) -> Self {
Self { pg_client, scy }
pub fn new(pg_client: Arc<PgClient>) -> Self {
Self { pg_client }
}
pub async fn get_series_id(&self, cd: ChannelDescDecoded) -> Result<Existence<SeriesId>, Error> {
@@ -43,8 +40,8 @@ impl ChannelRegistry {
pub struct DataStore {
pub scy: Arc<ScySession>,
pub qu_insert_series: Arc<PreparedStatement>,
pub qu_insert_ts_msp: Arc<PreparedStatement>,
pub qu_insert_series_by_ts_msp: Arc<PreparedStatement>,
pub qu_insert_scalar_i8: Arc<PreparedStatement>,
pub qu_insert_scalar_i16: Arc<PreparedStatement>,
pub qu_insert_scalar_i32: Arc<PreparedStatement>,
@@ -57,25 +54,21 @@ pub struct DataStore {
pub qu_insert_array_f32: Arc<PreparedStatement>,
pub qu_insert_array_f64: Arc<PreparedStatement>,
pub chan_reg: Arc<ChannelRegistry>,
pub ciqs: CommonInsertQueueSender,
}
impl DataStore {
pub async fn new(
pg_client: Arc<PgClient>,
scy: Arc<ScySession>,
ciqs: CommonInsertQueueSender,
) -> Result<Self, Error> {
let q = scy
.prepare("insert into series (part, series, ts_msp, scalar_type, shape_dims) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_series = Arc::new(q);
pub async fn new(pg_client: Arc<PgClient>, scy: Arc<ScySession>) -> Result<Self, Error> {
let q = scy
.prepare("insert into ts_msp (series, ts_msp) values (?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_ts_msp = Arc::new(q);
let q = scy
.prepare("insert into series_by_ts_msp (ts_msp, shape_kind, scalar_type, series) values (?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_series_by_ts_msp = Arc::new(q);
// scalar:
let q = scy
.prepare("insert into events_scalar_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
@@ -133,10 +126,10 @@ impl DataStore {
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_array_f64 = Arc::new(q);
let ret = Self {
chan_reg: Arc::new(ChannelRegistry::new(pg_client, scy.clone())),
chan_reg: Arc::new(ChannelRegistry::new(pg_client)),
scy,
qu_insert_series,
qu_insert_ts_msp,
qu_insert_series_by_ts_msp,
qu_insert_scalar_i8,
qu_insert_scalar_i16,
qu_insert_scalar_i32,
@@ -148,7 +141,6 @@ impl DataStore {
qu_insert_array_i32,
qu_insert_array_f32,
qu_insert_array_f64,
ciqs,
};
Ok(ret)
}

View File

@@ -3,7 +3,6 @@ use crate::errconv::ErrConv;
use err::Error;
#[allow(unused)]
use log::*;
use scylla::Session as ScySession;
use std::time::Duration;
use tokio_postgres::Client as PgClient;
@@ -22,76 +21,6 @@ impl SeriesId {
}
}
// TODO don't need byte_order or compression from ChannelDescDecoded for channel registration.
pub async fn get_series_id_scylla(scy: &ScySession, cd: &ChannelDescDecoded) -> Result<Existence<SeriesId>, Error> {
err::todo();
// TODO do not use, LWT in Scylla is currently buggy.
let facility = "scylla";
let channel_name = &cd.name;
let scalar_type = cd.scalar_type.to_scylla_i32();
let shape = cd.shape.to_scylla_vec();
info!("get_series_id {facility:?} {channel_name:?} {scalar_type:?} {shape:?}");
let res = scy
.query(
"select series, agg_kind from series_by_channel where facility = ? and channel_name = ? and scalar_type = ? and shape_dims = ?",
(facility, channel_name, &scalar_type, &shape),
)
.await
.err_conv()?;
let mut all = vec![];
for row in res.rows_typed_or_empty::<(i64, Option<String>)>() {
match row {
Ok(k) => {
if k.1.is_none() {
all.push(k.0);
}
}
Err(e) => return Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
info!("all: {all:?}");
let rn = all.len();
if rn == 0 {
use md5::Digest;
let mut h = md5::Md5::new();
h.update(facility.as_bytes());
h.update(channel_name.as_bytes());
h.update(format!("{:?} {:?}", scalar_type, shape).as_bytes());
let f = h.finalize();
let mut series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap());
// TODO technically we could/should assert that we run on 2-complement machine.
const SMASK: u64 = 0x7fffffffffffffff;
series = series & SMASK;
for _ in 0..2000 {
let res = scy
.query(
concat!(
"insert into series_by_channel",
" (facility, channel_name, scalar_type, shape_dims, agg_kind, series)",
" values (?, ?, ?, ?, null, ?) if not exists"
),
(facility, channel_name, &scalar_type, &shape, series as i64),
)
.await
.err_conv()?;
let row = res.first_row().err_conv()?;
if row.columns[0].as_ref().unwrap().as_boolean().unwrap() {
return Ok(Existence::Created(SeriesId(series)));
} else {
error!("tried to insert but series exists...");
}
tokio::time::sleep(Duration::from_millis(20)).await;
series += 1;
series = series & SMASK;
}
Err(Error::with_msg_no_trace(format!("can not create and insert series id")))
} else {
let series = all[0] as u64;
info!("series: {:?}", series);
Ok(Existence::Existing(SeriesId(series)))
}
}
// TODO don't need byte_order or compression from ChannelDescDecoded for channel registration.
pub async fn get_series_id(pg_client: &PgClient, cd: &ChannelDescDecoded) -> Result<Existence<SeriesId>, Error> {
let facility = "scylla";

View File

@@ -2,8 +2,7 @@ use crate::ca::proto::{CaDataArrayValue, CaDataScalarValue, CaDataValue};
use crate::ca::store::DataStore;
use crate::errconv::ErrConv;
use err::Error;
use futures_util::stream::FuturesOrdered;
use futures_util::{Future, FutureExt, Stream, StreamExt};
use futures_util::{Future, FutureExt};
use log::*;
use netpod::{ScalarType, Shape};
use scylla::frame::value::ValueList;
@@ -16,10 +15,6 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
const CHANNEL_CAP: usize = 128;
const POLLING_CAP: usize = 32;
const TABLE_SERIES_MOD: u32 = 128;
pub struct ScyInsertFut {
#[allow(unused)]
scy: Arc<ScySession>,
@@ -94,6 +89,7 @@ pub struct InsertItem {
pub ts_msp: u64,
pub ts_lsp: u64,
pub msp_bump: bool,
pub ts_msp_grid: Option<u32>,
pub pulse: u64,
pub scalar_type: ScalarType,
pub shape: Shape,
@@ -109,6 +105,11 @@ impl CommonInsertItemQueueSender {
pub fn send(&self, k: InsertItem) -> async_channel::Send<InsertItem> {
self.sender.send(k)
}
#[inline(always)]
pub fn is_full(&self) -> bool {
self.sender.is_full()
}
}
pub struct CommonInsertItemQueue {
@@ -185,18 +186,6 @@ where
pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaConnStats) -> Result<(), Error> {
if item.msp_bump {
let params = (
(item.series as u32 % TABLE_SERIES_MOD) as i32,
item.series as i64,
item.ts_msp as i64,
item.scalar_type.to_scylla_i32(),
item.shape.to_scylla_vec(),
);
data_store
.scy
.execute(&data_store.qu_insert_series, params)
.await
.err_conv()?;
let params = (item.series as i64, item.ts_msp as i64);
data_store
.scy
@@ -205,6 +194,20 @@ pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaCon
.err_conv()?;
stats.inserts_msp_inc()
}
if let Some(ts_msp_grid) = item.ts_msp_grid {
let params = (
ts_msp_grid as i32,
if item.shape.to_scylla_vec().is_empty() { 0 } else { 1 } as i32,
item.scalar_type.to_scylla_i32(),
item.series as i64,
);
data_store
.scy
.execute(&data_store.qu_insert_series_by_ts_msp, params)
.await
.err_conv()?;
stats.inserts_msp_grid_inc()
}
let par = InsParCom {
series: item.series,
ts_msp: item.ts_msp,
@@ -239,82 +242,3 @@ pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaCon
stats.inserts_val_inc();
Ok(())
}
type FutTy = Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;
pub struct CommonInsertQueueSender {
sender: async_channel::Sender<FutTy>,
}
impl CommonInsertQueueSender {
pub async fn send(&self, k: FutTy) -> Result<(), Error> {
self.sender
.send(k)
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))
}
}
pub struct CommonInsertQueue {
sender: async_channel::Sender<FutTy>,
recv: async_channel::Receiver<FutTy>,
futs: FuturesOrdered<FutTy>,
inp_done: bool,
}
impl CommonInsertQueue {
pub fn new() -> Self {
let (tx, rx) = async_channel::bounded(CHANNEL_CAP);
Self {
sender: tx.clone(),
recv: rx,
futs: FuturesOrdered::new(),
inp_done: false,
}
}
pub fn sender(&self) -> CommonInsertQueueSender {
CommonInsertQueueSender {
sender: self.sender.clone(),
}
}
}
impl Stream for CommonInsertQueue {
type Item = Result<(), Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
let _res_inp = if self.futs.len() < POLLING_CAP && !self.inp_done {
match self.recv.poll_next_unpin(cx) {
Ready(Some(k)) => {
self.futs.push(k);
continue;
}
Ready(None) => {
self.inp_done = true;
Ready(None)
}
Pending => Pending,
}
} else {
Ready(Some(()))
};
let res_qu = match self.futs.poll_next_unpin(cx) {
Ready(Some(Ok(_k))) => Ready(Some(Ok(()))),
Ready(Some(Err(e))) => Ready(Some(Err(e))),
Ready(None) => {
if self.inp_done {
Ready(None)
} else {
Pending
}
}
Pending => Pending,
};
// TODO monitor queue length and queue pushes per poll of this.
break res_qu;
}
}
}

View File

@@ -6,7 +6,7 @@ const US: u64 = 1000;
const MS: u64 = US * 1000;
const SEC: u64 = MS * 1000;
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct EMA {
ema: f32,
emv: f32,
@@ -56,6 +56,10 @@ impl EMA {
pub fn emv(&self) -> f32 {
self.emv
}
pub fn k(&self) -> f32 {
self.k
}
}
pub struct CheckEvery {
@@ -83,7 +87,7 @@ impl CheckEvery {
}
}
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct IntervalEma {
tslast: Option<Instant>,
ema: EMA,
@@ -109,6 +113,22 @@ impl IntervalEma {
}
}
}
pub fn ema_preview(&self, tsnow: Instant) -> Option<f32> {
match self.tslast {
Some(tslast) => {
let dt = tsnow.duration_since(tslast);
let v = dt.as_secs_f32();
let dv = v - self.ema.ema;
Some(self.ema.ema + self.ema.k * dv)
}
None => None,
}
}
pub fn ema(&self) -> &EMA {
&self.ema
}
}
stats_proc::stats_struct!((
@@ -118,9 +138,15 @@ stats_proc::stats_struct!((
insert_item_create,
inserts_val,
inserts_msp,
inserts_msp_grid,
inserts_queue_pop_for_global,
inserts_queue_push,
inserts_queue_pop,
inserts_queue_drop,
channel_fast_item_drop,
store_worker_item_recv,
store_worker_item_insert,
store_worker_item_drop,
store_worker_item_error,
poll_time_all,
poll_time_handle_insert_futs,
poll_time_get_series_futs,
@@ -128,6 +154,12 @@ stats_proc::stats_struct!((
time_handle_peer_ready,
time_check_channels_state_init,
time_handle_event_add_res,
ioc_lookup,
tcp_connected,
get_series_id_ok,
conn_item_count,
conn_stream_ready,
conn_stream_pending,
),
),
agg(name(CaConnStatsAgg), parent(CaConnStats)),