This commit is contained in:
Dominik Werder
2023-09-19 15:29:37 +02:00
parent 70b0ee0c69
commit 3792010724
7 changed files with 340 additions and 83 deletions
+1 -1
View File
@@ -10,7 +10,7 @@ default = []
bsread = []
[dependencies]
clap = { version = "4.4.2", features = ["derive", "cargo"] }
clap = { version = "4.4.4", features = ["derive", "cargo"] }
tracing = "0.1"
serde = { version = "1.0", features = ["derive"] }
tokio-postgres = "0.7.10"
+2
View File
@@ -8,6 +8,8 @@ edition = "2021"
futures-util = "0.3"
async-channel = "1.9.0"
scylla = "0.9.0"
smallvec = "1.11"
pin-project = "1.1.3"
log = { path = "../log" }
stats = { path = "../stats" }
series = { path = "../series" }
+137 -32
View File
@@ -1,16 +1,22 @@
use crate::iteminsertqueue::insert_channel_status;
use crate::iteminsertqueue::insert_connection_status;
use crate::iteminsertqueue::insert_item;
use crate::iteminsertqueue::insert_item_fut;
use crate::iteminsertqueue::insert_msp_fut;
use crate::iteminsertqueue::InsertFut;
use crate::iteminsertqueue::QueryItem;
use crate::store::DataStore;
use async_channel::Receiver;
use async_channel::Sender;
use err::Error;
use futures_util::Future;
use futures_util::TryFutureExt;
use log::*;
use netpod::timeunits::MS;
use netpod::timeunits::SEC;
use netpod::ScyllaConfig;
use stats::InsertWorkerStats;
use std::pin::Pin;
use std::sync::atomic;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
@@ -141,6 +147,51 @@ fn rate_limiter(
rx
}
pub async fn spawn_scylla_insert_workers(
scyconf: ScyllaConfig,
insert_scylla_sessions: usize,
insert_worker_count: usize,
item_inp: Receiver<QueryItem>,
insert_worker_opts: Arc<InsertWorkerOpts>,
store_stats: Arc<stats::InsertWorkerStats>,
use_rate_limit_queue: bool,
ttls: Ttls,
) -> Result<Vec<JoinHandle<Result<(), Error>>>, Error> {
let item_inp = if use_rate_limit_queue {
rate_limiter(item_inp, insert_worker_opts.clone(), store_stats.clone())
} else {
item_inp
};
let mut jhs = Vec::new();
let mut data_stores = Vec::new();
for _ in 0..insert_scylla_sessions {
let data_store = Arc::new(DataStore::new(&scyconf).await.map_err(|e| Error::from(e.to_string()))?);
data_stores.push(data_store);
}
for worker_ix in 0..insert_worker_count {
let data_store = data_stores[worker_ix * data_stores.len() / insert_worker_count].clone();
let jh = tokio::spawn(worker(
worker_ix,
item_inp.clone(),
ttls.clone(),
insert_worker_opts.clone(),
data_store,
store_stats.clone(),
));
// let jh = tokio::spawn(worker_streamed(
// worker_ix,
// insert_worker_count * 3,
// item_inp.clone(),
// ttls.clone(),
// insert_worker_opts.clone(),
// data_store,
// store_stats.clone(),
// ));
jhs.push(jh);
}
Ok(jhs)
}
async fn worker(
worker_ix: usize,
item_inp: Receiver<QueryItem>,
@@ -219,7 +270,7 @@ async fn worker(
}
let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire);
let do_insert = i1 % 1000 < insert_frac;
match insert_item(item, ttls.index, ttls.d0, ttls.d1, &data_store, &stats, do_insert).await {
match insert_item(item, &ttls, &data_store, &stats, do_insert).await {
Ok(_) => {
stats.inserted_values().inc();
let tsnow = {
@@ -354,38 +405,92 @@ async fn worker(
Ok(())
}
pub async fn spawn_scylla_insert_workers(
scyconf: ScyllaConfig,
insert_scylla_sessions: usize,
insert_worker_count: usize,
async fn worker_streamed(
worker_ix: usize,
concurrency: usize,
item_inp: Receiver<QueryItem>,
insert_worker_opts: Arc<InsertWorkerOpts>,
store_stats: Arc<stats::InsertWorkerStats>,
use_rate_limit_queue: bool,
ttls: Ttls,
) -> Result<Vec<JoinHandle<Result<(), Error>>>, Error> {
let item_inp = if use_rate_limit_queue {
rate_limiter(item_inp, insert_worker_opts.clone(), store_stats.clone())
} else {
item_inp
};
let mut jhs = Vec::new();
let mut data_stores = Vec::new();
for _ in 0..insert_scylla_sessions {
let data_store = Arc::new(DataStore::new(&scyconf).await.map_err(|e| Error::from(e.to_string()))?);
data_stores.push(data_store);
insert_worker_opts: Arc<InsertWorkerOpts>,
data_store: Arc<DataStore>,
stats: Arc<InsertWorkerStats>,
) -> Result<(), Error> {
use futures_util::StreamExt;
stats.worker_start().inc();
insert_worker_opts
.insert_workers_running
.fetch_add(1, atomic::Ordering::AcqRel);
let mut stream = item_inp
.map(|item| match item {
QueryItem::Insert(item) => {
stats.item_recv.inc();
// let mut futs: smallvec::SmallVec<
// [Pin<Box<dyn Future<Output = Result<(), crate::iteminsertqueue::Error>> + Send>>; 4],
// > = smallvec::smallvec![];
let mut futs: Vec<Pin<Box<dyn Future<Output = Result<(), crate::iteminsertqueue::Error>> + Send>>> =
Vec::new();
if item.msp_bump {
stats.inserts_msp().inc();
let fut = insert_msp_fut(
item.series.clone(),
item.ts_msp,
&ttls,
&data_store.scy,
&data_store.qu_insert_ts_msp,
);
// futs.push(Box::pin(fut));
}
#[cfg(DISABLED)]
if let Some(ts_msp_grid) = item.ts_msp_grid {
let params = (
(item.series.id() as i32) & 0xff,
ts_msp_grid as i32,
if item.shape.to_scylla_vec().is_empty() { 0 } else { 1 } as i32,
item.scalar_type.to_scylla_i32(),
item.series.id() as i64,
ttls.index.as_secs() as i32,
);
data_store
.scy
.execute(&data_store.qu_insert_series_by_ts_msp, params)
.await?;
stats.inserts_msp_grid().inc();
}
let do_insert = true;
// TODO prepare db future and pass-through.
stats.inserts_value().inc();
let fut = insert_item_fut(item, &ttls, &data_store, do_insert);
// .map_err(|e| Error::with_msg_no_trace(e.to_string()))
let fut = tokio::task::unconstrained(fut);
// futs.push(Box::pin(fut));
futs
}
_ => {
// TODO
Vec::new()
}
})
.map(|mut x| async move { x.pop().unwrap().await })
// .map(|x| futures_util::stream::iter(x))
// .flatten_unordered(None)
.buffer_unordered(concurrency)
.map(|x| x);
while let Some(item) = stream.next().await {
match item {
Ok(()) => {
stats.inserted_values().inc();
// TODO compute the insert latency bin and count.
}
Err(e) => {
stats_inc_for_err(&stats, &e);
}
}
}
for worker_ix in 0..insert_worker_count {
let data_store = data_stores[worker_ix * data_stores.len() / insert_worker_count].clone();
let jh = tokio::spawn(worker(
worker_ix,
item_inp.clone(),
ttls.clone(),
insert_worker_opts.clone(),
data_store,
store_stats.clone(),
));
jhs.push(jh);
}
Ok(jhs)
stats.worker_finish().inc();
insert_worker_opts
.insert_workers_running
.fetch_sub(1, atomic::Ordering::AcqRel);
trace2!("insert worker {worker_ix} done");
Ok(())
}
+176 -7
View File
@@ -1,8 +1,13 @@
pub use netpod::CONNECTION_STATUS_DIV;
use crate::insertworker::Ttls;
use crate::session::ScySession;
use crate::store::DataStore;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::TryFutureExt;
use netpod::ScalarType;
use netpod::Shape;
use scylla::prepared_statement::PreparedStatement;
@@ -11,6 +16,10 @@ use scylla::transport::errors::QueryError;
use series::SeriesId;
use stats::InsertWorkerStats;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::SystemTime;
@@ -245,6 +254,74 @@ struct InsParCom {
do_insert: bool,
}
fn insert_scalar_gen_fut<ST>(
par: InsParCom,
val: ST,
qu: &Arc<PreparedStatement>,
scy: &Arc<ScySession>,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>
where
ST: scylla::frame::value::Value + Send + 'static,
{
Box::pin(insert_scalar_gen_fut_inner(par, val, qu.clone(), scy.clone()))
}
async fn insert_scalar_gen_fut_inner<ST>(
par: InsParCom,
val: ST,
qu: Arc<PreparedStatement>,
scy: Arc<ScySession>,
) -> Result<(), Error>
where
ST: scylla::frame::value::Value,
{
let params = (
par.series as i64,
par.ts_msp as i64,
par.ts_lsp as i64,
par.pulse as i64,
val,
par.ttl as i32,
);
scy.execute(&qu, params)
.map(|item| {
match item {
Ok(_) => Ok(()),
Err(e) => match e {
QueryError::TimeoutError => Err(Error::DbTimeout),
// TODO use `msg`
QueryError::DbError(e, _msg) => match e {
DbError::Overloaded => Err(Error::DbOverload),
_ => Err(e.into()),
},
_ => Err(e.into()),
},
}
})
.await
}
#[pin_project::pin_project]
pub struct InsertFut<F> {
scy: Arc<ScySession>,
qu: Arc<PreparedStatement>,
fut: F,
}
impl<F> InsertFut<F> {
pub fn new(scy: Arc<ScySession>, qu: Arc<PreparedStatement>, fut: F) -> Self {
Self { scy, qu, fut }
}
}
impl<F> Future for InsertFut<F> {
type Output = Result<(), Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
todo!()
}
}
async fn insert_scalar_gen<ST>(
par: InsParCom,
val: ST,
@@ -319,15 +396,13 @@ where
pub async fn insert_item(
item: InsertItem,
ttl_index: Duration,
ttl_0d: Duration,
ttl_1d: Duration,
ttls: &Ttls,
data_store: &DataStore,
stats: &InsertWorkerStats,
do_insert: bool,
) -> Result<(), Error> {
if item.msp_bump {
let params = (item.series.id() as i64, item.ts_msp as i64, ttl_index.as_secs() as i32);
let params = (item.series.id() as i64, item.ts_msp as i64, ttls.index.as_secs() as i32);
data_store.scy.execute(&data_store.qu_insert_ts_msp, params).await?;
stats.inserts_msp().inc();
}
@@ -338,7 +413,7 @@ pub async fn insert_item(
if item.shape.to_scylla_vec().is_empty() { 0 } else { 1 } as i32,
item.scalar_type.to_scylla_i32(),
item.series.id() as i64,
ttl_index.as_secs() as i32,
ttls.index.as_secs() as i32,
);
data_store
.scy
@@ -354,7 +429,7 @@ pub async fn insert_item(
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
pulse: item.pulse,
ttl: ttl_0d.as_secs() as _,
ttl: ttls.d0.as_secs() as _,
do_insert,
};
use ScalarValue::*;
@@ -375,7 +450,7 @@ pub async fn insert_item(
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
pulse: item.pulse,
ttl: ttl_1d.as_secs() as _,
ttl: ttls.d1.as_secs() as _,
do_insert,
};
use ArrayValue::*;
@@ -393,6 +468,100 @@ pub async fn insert_item(
Ok(())
}
pub async fn insert_msp_fut(
series: SeriesId,
ts_msp: u64,
ttls: &Ttls,
scy: &ScySession,
qu: &PreparedStatement,
) -> Result<(), Error> {
let params = (series.id() as i64, ts_msp as i64, ttls.index.as_secs() as i32);
scy.execute(qu, params)
.map(|item| {
match item {
Ok(_) => Ok(()),
Err(e) => match e {
QueryError::TimeoutError => Err(Error::DbTimeout),
// TODO use `msg`
QueryError::DbError(e, _msg) => match e {
DbError::Overloaded => Err(Error::DbOverload),
_ => Err(e.into()),
},
_ => Err(e.into()),
},
}
})
.await
}
pub fn insert_item_fut(
item: InsertItem,
ttls: &Ttls,
data_store: &DataStore,
do_insert: bool,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> {
let par = InsParCom {
series: item.series.id(),
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
pulse: item.pulse,
ttl: ttls.d0.as_secs() as _,
do_insert,
};
let scy = &data_store.scy;
use DataValue::*;
match item.val {
Scalar(val) => {
let par = InsParCom {
series: item.series.id(),
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
pulse: item.pulse,
ttl: ttls.d0.as_secs() as _,
do_insert,
};
use ScalarValue::*;
match val {
I8(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_i8, scy),
I16(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_i16, scy),
Enum(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_i16, scy),
I32(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_i32, scy),
F32(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_f32, scy),
F64(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_f64, scy),
String(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_string, scy),
Bool(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_bool, scy),
}
}
Array(val) => {
let par = InsParCom {
series: item.series.id(),
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
pulse: item.pulse,
ttl: ttls.d1.as_secs() as _,
do_insert,
};
use ArrayValue::*;
match val {
// I8(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i8, &data_store).await?,
// I16(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i16, &data_store).await?,
// I32(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i32, &data_store).await?,
// F32(val) => insert_array_gen(par, val, &data_store.qu_insert_array_f32, &data_store).await?,
// F64(val) => insert_array_gen(par, val, &data_store.qu_insert_array_f64, &data_store).await?,
// Bool(val) => insert_array_gen(par, val, &data_store.qu_insert_array_bool, &data_store).await?,
_ => Box::pin(futures_util::future::ready(Ok(()))),
}
}
}
// let val: i32 = 4242;
// insert_scalar_gen_fut(
// par,
// val,
// data_store.qu_insert_scalar_i32.clone(),
// data_store.scy.clone(),
// )
}
pub async fn insert_connection_status(
item: ConnectionStatusItem,
ttl: Duration,
+11 -7
View File
@@ -7,6 +7,7 @@ use err::ThisError;
use scylla::execution_profile::ExecutionProfileBuilder;
use scylla::statement::Consistency;
use scylla::transport::errors::NewSessionError;
use std::num::NonZeroUsize;
use std::sync::Arc;
#[derive(Debug, ThisError)]
@@ -21,14 +22,17 @@ impl From<NewSessionError> for Error {
}
pub async fn create_session_no_ks(scyconf: &ScyllaConfig) -> Result<Arc<Session>, Error> {
let scy = scylla::SessionBuilder::new()
let profile = ExecutionProfileBuilder::default()
.consistency(Consistency::LocalOne)
.build()
.into_handle();
let scy = scylla::transport::session_builder::GenericSessionBuilder::new()
.pool_size(scylla::transport::session::PoolSize::PerShard(
NonZeroUsize::new(1).unwrap(),
))
.known_nodes(&scyconf.hosts)
.default_execution_profile_handle(
ExecutionProfileBuilder::default()
.consistency(Consistency::LocalOne)
.build()
.into_handle(),
)
.default_execution_profile_handle(profile)
.write_coalescing(true)
.build()
.await?;
let scy = Arc::new(scy);
+3 -14
View File
@@ -1,9 +1,8 @@
use crate::session::create_session;
use err::thiserror;
use err::ThisError;
use netpod::ScyllaConfig;
use scylla::execution_profile::ExecutionProfileBuilder;
use scylla::prepared_statement::PreparedStatement;
use scylla::statement::Consistency;
use scylla::transport::errors::NewSessionError;
use scylla::transport::errors::QueryError;
use scylla::Session as ScySession;
@@ -13,6 +12,7 @@ use std::sync::Arc;
pub enum Error {
NewSessionError(#[from] NewSessionError),
QueryError(#[from] QueryError),
NewSession,
}
pub struct DataStore {
@@ -45,18 +45,7 @@ pub struct DataStore {
impl DataStore {
pub async fn new(scyconf: &ScyllaConfig) -> Result<Self, Error> {
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyconf.hosts)
.use_keyspace(&scyconf.keyspace, true)
.default_execution_profile_handle(
ExecutionProfileBuilder::default()
.consistency(Consistency::LocalOne)
.build()
.into_handle(),
)
.build()
.await?;
let scy = Arc::new(scy);
let scy = create_session(scyconf).await.map_err(|_| Error::NewSession)?;
let q = scy
.prepare("insert into ts_msp (series, ts_msp) values (?, ?) using ttl ?")
+10 -22
View File
@@ -1,11 +1,8 @@
use crate::session::create_session;
use log::*;
use netpod::ScyllaConfig;
use scylla::execution_profile::ExecutionProfileBuilder;
use scylla::statement::Consistency;
use scylla::transport::errors::NewSessionError;
use scylla::transport::errors::QueryError;
use scylla::Session;
use scylla::SessionBuilder;
pub struct Error(err::Error);
@@ -27,23 +24,10 @@ impl From<QueryError> for Error {
}
}
async fn make_scy_session(conf: &ScyllaConfig) -> Result<Session, Error> {
let scy = SessionBuilder::new()
.known_nodes(&conf.hosts)
.use_keyspace(&conf.keyspace, true)
.default_execution_profile_handle(
ExecutionProfileBuilder::default()
.consistency(Consistency::LocalOne)
.build()
.into_handle(),
)
.build()
.await?;
Ok(scy)
}
pub async fn list_pkey(scylla_conf: &ScyllaConfig) -> Result<(), Error> {
let scy = make_scy_session(scylla_conf).await?;
let scy = create_session(scylla_conf)
.await
.map_err(|e| Error(err::Error::with_msg_no_trace(e.to_string())))?;
let query = scy
.prepare("select distinct token(pulse_a), pulse_a from pulse where token(pulse_a) >= ? and token(pulse_a) <= ?")
.await?;
@@ -79,7 +63,9 @@ pub async fn list_pkey(scylla_conf: &ScyllaConfig) -> Result<(), Error> {
}
pub async fn list_pulses(scylla_conf: &ScyllaConfig) -> Result<(), Error> {
let scy = make_scy_session(scylla_conf).await?;
let scy = create_session(scylla_conf)
.await
.map_err(|e| Error(err::Error::with_msg_no_trace(e.to_string())))?;
let query = scy
.prepare("select token(tsa) as tsatok, tsa, tsb, pulse from pulse where token(tsa) >= ? and token(tsa) <= ?")
.await?;
@@ -116,7 +102,9 @@ pub async fn list_pulses(scylla_conf: &ScyllaConfig) -> Result<(), Error> {
pub async fn fetch_events(backend: &str, channel: &str, scylla_conf: &ScyllaConfig) -> Result<(), Error> {
// TODO use the keyspace from commandline.
err::todo();
let scy = make_scy_session(scylla_conf).await?;
let scy = create_session(scylla_conf)
.await
.map_err(|e| Error(err::Error::with_msg_no_trace(e.to_string())))?;
let qu_series = scy
.prepare(
"select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?",