From 3792010724fe992044b04522479028d5b654a5d9 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 19 Sep 2023 15:29:37 +0200 Subject: [PATCH] WIP --- daqingest/Cargo.toml | 2 +- scywr/Cargo.toml | 2 + scywr/src/insertworker.rs | 169 ++++++++++++++++++++++++++------ scywr/src/iteminsertqueue.rs | 183 +++++++++++++++++++++++++++++++++-- scywr/src/session.rs | 18 ++-- scywr/src/store.rs | 17 +--- scywr/src/tools.rs | 32 ++---- 7 files changed, 340 insertions(+), 83 deletions(-) diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index da6e0d1..066fd38 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -10,7 +10,7 @@ default = [] bsread = [] [dependencies] -clap = { version = "4.4.2", features = ["derive", "cargo"] } +clap = { version = "4.4.4", features = ["derive", "cargo"] } tracing = "0.1" serde = { version = "1.0", features = ["derive"] } tokio-postgres = "0.7.10" diff --git a/scywr/Cargo.toml b/scywr/Cargo.toml index a17b3fa..32d40d8 100644 --- a/scywr/Cargo.toml +++ b/scywr/Cargo.toml @@ -8,6 +8,8 @@ edition = "2021" futures-util = "0.3" async-channel = "1.9.0" scylla = "0.9.0" +smallvec = "1.11" +pin-project = "1.1.3" log = { path = "../log" } stats = { path = "../stats" } series = { path = "../series" } diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 08581b2..e62adce 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -1,16 +1,22 @@ use crate::iteminsertqueue::insert_channel_status; use crate::iteminsertqueue::insert_connection_status; use crate::iteminsertqueue::insert_item; +use crate::iteminsertqueue::insert_item_fut; +use crate::iteminsertqueue::insert_msp_fut; +use crate::iteminsertqueue::InsertFut; use crate::iteminsertqueue::QueryItem; use crate::store::DataStore; use async_channel::Receiver; use async_channel::Sender; use err::Error; +use futures_util::Future; +use futures_util::TryFutureExt; use log::*; use netpod::timeunits::MS; use netpod::timeunits::SEC; use netpod::ScyllaConfig; use stats::InsertWorkerStats; +use std::pin::Pin; use std::sync::atomic; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; @@ -141,6 +147,51 @@ fn rate_limiter( rx } +pub async fn spawn_scylla_insert_workers( + scyconf: ScyllaConfig, + insert_scylla_sessions: usize, + insert_worker_count: usize, + item_inp: Receiver, + insert_worker_opts: Arc, + store_stats: Arc, + use_rate_limit_queue: bool, + ttls: Ttls, +) -> Result>>, Error> { + let item_inp = if use_rate_limit_queue { + rate_limiter(item_inp, insert_worker_opts.clone(), store_stats.clone()) + } else { + item_inp + }; + let mut jhs = Vec::new(); + let mut data_stores = Vec::new(); + for _ in 0..insert_scylla_sessions { + let data_store = Arc::new(DataStore::new(&scyconf).await.map_err(|e| Error::from(e.to_string()))?); + data_stores.push(data_store); + } + for worker_ix in 0..insert_worker_count { + let data_store = data_stores[worker_ix * data_stores.len() / insert_worker_count].clone(); + let jh = tokio::spawn(worker( + worker_ix, + item_inp.clone(), + ttls.clone(), + insert_worker_opts.clone(), + data_store, + store_stats.clone(), + )); + // let jh = tokio::spawn(worker_streamed( + // worker_ix, + // insert_worker_count * 3, + // item_inp.clone(), + // ttls.clone(), + // insert_worker_opts.clone(), + // data_store, + // store_stats.clone(), + // )); + jhs.push(jh); + } + Ok(jhs) +} + async fn worker( worker_ix: usize, item_inp: Receiver, @@ -219,7 +270,7 @@ async fn worker( } let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire); let do_insert = i1 % 1000 < insert_frac; - match insert_item(item, ttls.index, ttls.d0, ttls.d1, &data_store, &stats, do_insert).await { + match insert_item(item, &ttls, &data_store, &stats, do_insert).await { Ok(_) => { stats.inserted_values().inc(); let tsnow = { @@ -354,38 +405,92 @@ async fn worker( Ok(()) } -pub async fn spawn_scylla_insert_workers( - scyconf: ScyllaConfig, - insert_scylla_sessions: usize, - insert_worker_count: usize, +async fn worker_streamed( + worker_ix: usize, + concurrency: usize, item_inp: Receiver, - insert_worker_opts: Arc, - store_stats: Arc, - use_rate_limit_queue: bool, ttls: Ttls, -) -> Result>>, Error> { - let item_inp = if use_rate_limit_queue { - rate_limiter(item_inp, insert_worker_opts.clone(), store_stats.clone()) - } else { - item_inp - }; - let mut jhs = Vec::new(); - let mut data_stores = Vec::new(); - for _ in 0..insert_scylla_sessions { - let data_store = Arc::new(DataStore::new(&scyconf).await.map_err(|e| Error::from(e.to_string()))?); - data_stores.push(data_store); + insert_worker_opts: Arc, + data_store: Arc, + stats: Arc, +) -> Result<(), Error> { + use futures_util::StreamExt; + stats.worker_start().inc(); + insert_worker_opts + .insert_workers_running + .fetch_add(1, atomic::Ordering::AcqRel); + let mut stream = item_inp + .map(|item| match item { + QueryItem::Insert(item) => { + stats.item_recv.inc(); + // let mut futs: smallvec::SmallVec< + // [Pin> + Send>>; 4], + // > = smallvec::smallvec![]; + let mut futs: Vec> + Send>>> = + Vec::new(); + if item.msp_bump { + stats.inserts_msp().inc(); + let fut = insert_msp_fut( + item.series.clone(), + item.ts_msp, + &ttls, + &data_store.scy, + &data_store.qu_insert_ts_msp, + ); + // futs.push(Box::pin(fut)); + } + #[cfg(DISABLED)] + if let Some(ts_msp_grid) = item.ts_msp_grid { + let params = ( + (item.series.id() as i32) & 0xff, + ts_msp_grid as i32, + if item.shape.to_scylla_vec().is_empty() { 0 } else { 1 } as i32, + item.scalar_type.to_scylla_i32(), + item.series.id() as i64, + ttls.index.as_secs() as i32, + ); + data_store + .scy + .execute(&data_store.qu_insert_series_by_ts_msp, params) + .await?; + stats.inserts_msp_grid().inc(); + } + let do_insert = true; + // TODO prepare db future and pass-through. + stats.inserts_value().inc(); + let fut = insert_item_fut(item, &ttls, &data_store, do_insert); + // .map_err(|e| Error::with_msg_no_trace(e.to_string())) + let fut = tokio::task::unconstrained(fut); + // futs.push(Box::pin(fut)); + futs + } + _ => { + // TODO + Vec::new() + } + }) + .map(|mut x| async move { x.pop().unwrap().await }) + // .map(|x| futures_util::stream::iter(x)) + // .flatten_unordered(None) + .buffer_unordered(concurrency) + .map(|x| x); + + while let Some(item) = stream.next().await { + match item { + Ok(()) => { + stats.inserted_values().inc(); + // TODO compute the insert latency bin and count. + } + Err(e) => { + stats_inc_for_err(&stats, &e); + } + } } - for worker_ix in 0..insert_worker_count { - let data_store = data_stores[worker_ix * data_stores.len() / insert_worker_count].clone(); - let jh = tokio::spawn(worker( - worker_ix, - item_inp.clone(), - ttls.clone(), - insert_worker_opts.clone(), - data_store, - store_stats.clone(), - )); - jhs.push(jh); - } - Ok(jhs) + + stats.worker_finish().inc(); + insert_worker_opts + .insert_workers_running + .fetch_sub(1, atomic::Ordering::AcqRel); + trace2!("insert worker {worker_ix} done"); + Ok(()) } diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index e995bbb..4a7357c 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -1,8 +1,13 @@ pub use netpod::CONNECTION_STATUS_DIV; +use crate::insertworker::Ttls; +use crate::session::ScySession; use crate::store::DataStore; use err::thiserror; use err::ThisError; +use futures_util::Future; +use futures_util::FutureExt; +use futures_util::TryFutureExt; use netpod::ScalarType; use netpod::Shape; use scylla::prepared_statement::PreparedStatement; @@ -11,6 +16,10 @@ use scylla::transport::errors::QueryError; use series::SeriesId; use stats::InsertWorkerStats; use std::net::SocketAddrV4; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; use std::time::Duration; use std::time::SystemTime; @@ -245,6 +254,74 @@ struct InsParCom { do_insert: bool, } +fn insert_scalar_gen_fut( + par: InsParCom, + val: ST, + qu: &Arc, + scy: &Arc, +) -> Pin> + Send>> +where + ST: scylla::frame::value::Value + Send + 'static, +{ + Box::pin(insert_scalar_gen_fut_inner(par, val, qu.clone(), scy.clone())) +} + +async fn insert_scalar_gen_fut_inner( + par: InsParCom, + val: ST, + qu: Arc, + scy: Arc, +) -> Result<(), Error> +where + ST: scylla::frame::value::Value, +{ + let params = ( + par.series as i64, + par.ts_msp as i64, + par.ts_lsp as i64, + par.pulse as i64, + val, + par.ttl as i32, + ); + scy.execute(&qu, params) + .map(|item| { + match item { + Ok(_) => Ok(()), + Err(e) => match e { + QueryError::TimeoutError => Err(Error::DbTimeout), + // TODO use `msg` + QueryError::DbError(e, _msg) => match e { + DbError::Overloaded => Err(Error::DbOverload), + _ => Err(e.into()), + }, + _ => Err(e.into()), + }, + } + }) + .await +} + +#[pin_project::pin_project] +pub struct InsertFut { + scy: Arc, + qu: Arc, + fut: F, +} + +impl InsertFut { + pub fn new(scy: Arc, qu: Arc, fut: F) -> Self { + Self { scy, qu, fut } + } +} + +impl Future for InsertFut { + type Output = Result<(), Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + todo!() + } +} + async fn insert_scalar_gen( par: InsParCom, val: ST, @@ -319,15 +396,13 @@ where pub async fn insert_item( item: InsertItem, - ttl_index: Duration, - ttl_0d: Duration, - ttl_1d: Duration, + ttls: &Ttls, data_store: &DataStore, stats: &InsertWorkerStats, do_insert: bool, ) -> Result<(), Error> { if item.msp_bump { - let params = (item.series.id() as i64, item.ts_msp as i64, ttl_index.as_secs() as i32); + let params = (item.series.id() as i64, item.ts_msp as i64, ttls.index.as_secs() as i32); data_store.scy.execute(&data_store.qu_insert_ts_msp, params).await?; stats.inserts_msp().inc(); } @@ -338,7 +413,7 @@ pub async fn insert_item( if item.shape.to_scylla_vec().is_empty() { 0 } else { 1 } as i32, item.scalar_type.to_scylla_i32(), item.series.id() as i64, - ttl_index.as_secs() as i32, + ttls.index.as_secs() as i32, ); data_store .scy @@ -354,7 +429,7 @@ pub async fn insert_item( ts_msp: item.ts_msp, ts_lsp: item.ts_lsp, pulse: item.pulse, - ttl: ttl_0d.as_secs() as _, + ttl: ttls.d0.as_secs() as _, do_insert, }; use ScalarValue::*; @@ -375,7 +450,7 @@ pub async fn insert_item( ts_msp: item.ts_msp, ts_lsp: item.ts_lsp, pulse: item.pulse, - ttl: ttl_1d.as_secs() as _, + ttl: ttls.d1.as_secs() as _, do_insert, }; use ArrayValue::*; @@ -393,6 +468,100 @@ pub async fn insert_item( Ok(()) } +pub async fn insert_msp_fut( + series: SeriesId, + ts_msp: u64, + ttls: &Ttls, + scy: &ScySession, + qu: &PreparedStatement, +) -> Result<(), Error> { + let params = (series.id() as i64, ts_msp as i64, ttls.index.as_secs() as i32); + scy.execute(qu, params) + .map(|item| { + match item { + Ok(_) => Ok(()), + Err(e) => match e { + QueryError::TimeoutError => Err(Error::DbTimeout), + // TODO use `msg` + QueryError::DbError(e, _msg) => match e { + DbError::Overloaded => Err(Error::DbOverload), + _ => Err(e.into()), + }, + _ => Err(e.into()), + }, + } + }) + .await +} + +pub fn insert_item_fut( + item: InsertItem, + ttls: &Ttls, + data_store: &DataStore, + do_insert: bool, +) -> Pin> + Send>> { + let par = InsParCom { + series: item.series.id(), + ts_msp: item.ts_msp, + ts_lsp: item.ts_lsp, + pulse: item.pulse, + ttl: ttls.d0.as_secs() as _, + do_insert, + }; + let scy = &data_store.scy; + use DataValue::*; + match item.val { + Scalar(val) => { + let par = InsParCom { + series: item.series.id(), + ts_msp: item.ts_msp, + ts_lsp: item.ts_lsp, + pulse: item.pulse, + ttl: ttls.d0.as_secs() as _, + do_insert, + }; + use ScalarValue::*; + match val { + I8(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_i8, scy), + I16(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_i16, scy), + Enum(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_i16, scy), + I32(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_i32, scy), + F32(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_f32, scy), + F64(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_f64, scy), + String(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_string, scy), + Bool(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_bool, scy), + } + } + Array(val) => { + let par = InsParCom { + series: item.series.id(), + ts_msp: item.ts_msp, + ts_lsp: item.ts_lsp, + pulse: item.pulse, + ttl: ttls.d1.as_secs() as _, + do_insert, + }; + use ArrayValue::*; + match val { + // I8(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i8, &data_store).await?, + // I16(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i16, &data_store).await?, + // I32(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i32, &data_store).await?, + // F32(val) => insert_array_gen(par, val, &data_store.qu_insert_array_f32, &data_store).await?, + // F64(val) => insert_array_gen(par, val, &data_store.qu_insert_array_f64, &data_store).await?, + // Bool(val) => insert_array_gen(par, val, &data_store.qu_insert_array_bool, &data_store).await?, + _ => Box::pin(futures_util::future::ready(Ok(()))), + } + } + } + // let val: i32 = 4242; + // insert_scalar_gen_fut( + // par, + // val, + // data_store.qu_insert_scalar_i32.clone(), + // data_store.scy.clone(), + // ) +} + pub async fn insert_connection_status( item: ConnectionStatusItem, ttl: Duration, diff --git a/scywr/src/session.rs b/scywr/src/session.rs index f84893e..f60e28e 100644 --- a/scywr/src/session.rs +++ b/scywr/src/session.rs @@ -7,6 +7,7 @@ use err::ThisError; use scylla::execution_profile::ExecutionProfileBuilder; use scylla::statement::Consistency; use scylla::transport::errors::NewSessionError; +use std::num::NonZeroUsize; use std::sync::Arc; #[derive(Debug, ThisError)] @@ -21,14 +22,17 @@ impl From for Error { } pub async fn create_session_no_ks(scyconf: &ScyllaConfig) -> Result, Error> { - let scy = scylla::SessionBuilder::new() + let profile = ExecutionProfileBuilder::default() + .consistency(Consistency::LocalOne) + .build() + .into_handle(); + let scy = scylla::transport::session_builder::GenericSessionBuilder::new() + .pool_size(scylla::transport::session::PoolSize::PerShard( + NonZeroUsize::new(1).unwrap(), + )) .known_nodes(&scyconf.hosts) - .default_execution_profile_handle( - ExecutionProfileBuilder::default() - .consistency(Consistency::LocalOne) - .build() - .into_handle(), - ) + .default_execution_profile_handle(profile) + .write_coalescing(true) .build() .await?; let scy = Arc::new(scy); diff --git a/scywr/src/store.rs b/scywr/src/store.rs index d34c220..7899e2f 100644 --- a/scywr/src/store.rs +++ b/scywr/src/store.rs @@ -1,9 +1,8 @@ +use crate::session::create_session; use err::thiserror; use err::ThisError; use netpod::ScyllaConfig; -use scylla::execution_profile::ExecutionProfileBuilder; use scylla::prepared_statement::PreparedStatement; -use scylla::statement::Consistency; use scylla::transport::errors::NewSessionError; use scylla::transport::errors::QueryError; use scylla::Session as ScySession; @@ -13,6 +12,7 @@ use std::sync::Arc; pub enum Error { NewSessionError(#[from] NewSessionError), QueryError(#[from] QueryError), + NewSession, } pub struct DataStore { @@ -45,18 +45,7 @@ pub struct DataStore { impl DataStore { pub async fn new(scyconf: &ScyllaConfig) -> Result { - let scy = scylla::SessionBuilder::new() - .known_nodes(&scyconf.hosts) - .use_keyspace(&scyconf.keyspace, true) - .default_execution_profile_handle( - ExecutionProfileBuilder::default() - .consistency(Consistency::LocalOne) - .build() - .into_handle(), - ) - .build() - .await?; - let scy = Arc::new(scy); + let scy = create_session(scyconf).await.map_err(|_| Error::NewSession)?; let q = scy .prepare("insert into ts_msp (series, ts_msp) values (?, ?) using ttl ?") diff --git a/scywr/src/tools.rs b/scywr/src/tools.rs index a383d7a..7064dc4 100644 --- a/scywr/src/tools.rs +++ b/scywr/src/tools.rs @@ -1,11 +1,8 @@ +use crate::session::create_session; use log::*; use netpod::ScyllaConfig; -use scylla::execution_profile::ExecutionProfileBuilder; -use scylla::statement::Consistency; use scylla::transport::errors::NewSessionError; use scylla::transport::errors::QueryError; -use scylla::Session; -use scylla::SessionBuilder; pub struct Error(err::Error); @@ -27,23 +24,10 @@ impl From for Error { } } -async fn make_scy_session(conf: &ScyllaConfig) -> Result { - let scy = SessionBuilder::new() - .known_nodes(&conf.hosts) - .use_keyspace(&conf.keyspace, true) - .default_execution_profile_handle( - ExecutionProfileBuilder::default() - .consistency(Consistency::LocalOne) - .build() - .into_handle(), - ) - .build() - .await?; - Ok(scy) -} - pub async fn list_pkey(scylla_conf: &ScyllaConfig) -> Result<(), Error> { - let scy = make_scy_session(scylla_conf).await?; + let scy = create_session(scylla_conf) + .await + .map_err(|e| Error(err::Error::with_msg_no_trace(e.to_string())))?; let query = scy .prepare("select distinct token(pulse_a), pulse_a from pulse where token(pulse_a) >= ? and token(pulse_a) <= ?") .await?; @@ -79,7 +63,9 @@ pub async fn list_pkey(scylla_conf: &ScyllaConfig) -> Result<(), Error> { } pub async fn list_pulses(scylla_conf: &ScyllaConfig) -> Result<(), Error> { - let scy = make_scy_session(scylla_conf).await?; + let scy = create_session(scylla_conf) + .await + .map_err(|e| Error(err::Error::with_msg_no_trace(e.to_string())))?; let query = scy .prepare("select token(tsa) as tsatok, tsa, tsb, pulse from pulse where token(tsa) >= ? and token(tsa) <= ?") .await?; @@ -116,7 +102,9 @@ pub async fn list_pulses(scylla_conf: &ScyllaConfig) -> Result<(), Error> { pub async fn fetch_events(backend: &str, channel: &str, scylla_conf: &ScyllaConfig) -> Result<(), Error> { // TODO use the keyspace from commandline. err::todo(); - let scy = make_scy_session(scylla_conf).await?; + let scy = create_session(scylla_conf) + .await + .map_err(|e| Error(err::Error::with_msg_no_trace(e.to_string())))?; let qu_series = scy .prepare( "select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?",