Refactoring

This commit is contained in:
Dominik Werder
2024-04-15 12:24:27 +01:00
parent cf0ed57e7e
commit 9f04c7616c
13 changed files with 336 additions and 176 deletions
+1 -1
View File
@@ -57,7 +57,7 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
pass: k.pg_pass,
name: k.pg_name,
};
let scyconf = ScyllaIngestConfig::new([k.scylla_host], k.scylla_keyspace);
let scyconf = ScyllaIngestConfig::new([k.scylla_host], k.scylla_keyspace, "DUMMY");
// scywr::schema::migrate_scylla_data_schema(&scyconf, netpod::ttl::RetentionTime::Short)
// .await
// .map_err(Error::from_string)?;
+77 -34
View File
@@ -3,6 +3,7 @@ pub mod inserthook;
use async_channel::Receiver;
use async_channel::Sender;
use async_channel::WeakSender;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::Error;
use log::*;
use netfetch::ca::connset::CaConnSet;
@@ -75,6 +76,10 @@ pub struct Daemon {
insert_workers_running: AtomicU64,
query_item_tx_weak: WeakSender<VecDeque<QueryItem>>,
connset_health_lat_ema: f32,
metrics_shutdown_tx: Sender<u32>,
metrics_shutdown_rx: Receiver<u32>,
metrics_jh: Option<JoinHandle<Result<(), Error>>>,
channel_info_query_tx: Sender<ChannelInfoQuery>,
}
impl Daemon {
@@ -109,7 +114,7 @@ impl Daemon {
ingest_opts.backend().into(),
local_epics_hostname,
query_item_tx,
channel_info_query_tx,
channel_info_query_tx.clone(),
ingest_opts.clone(),
writer_establis_tx,
);
@@ -170,6 +175,7 @@ impl Daemon {
let rett = RetentionTime::Short;
#[cfg(DISABLED)]
let insert_workers_jh = scywr::insertworker::spawn_scylla_insert_workers(
rett,
opts.scyconf.clone(),
@@ -182,6 +188,14 @@ impl Daemon {
ingest_opts.use_rate_limit_queue(),
)
.await?;
let insert_workers_jh = scywr::insertworker::spawn_scylla_insert_workers_dummy(
ingest_opts.insert_worker_count(),
ingest_opts.insert_worker_concurrency(),
query_item_rx,
insert_worker_opts,
insert_worker_stats.clone(),
)
.await?;
let stats = Arc::new(DaemonStats::new());
stats.insert_worker_spawned().add(insert_workers_jh.len() as _);
@@ -218,6 +232,8 @@ impl Daemon {
//jh.await.map_err(|e| e.to_string()).map_err(Error::from)??;
}
let (metrics_shutdown_tx, metrics_shutdown_rx) = async_channel::bounded(8);
let ret = Self {
opts,
ingest_opts,
@@ -241,6 +257,10 @@ impl Daemon {
insert_workers_running: AtomicU64::new(0),
query_item_tx_weak,
connset_health_lat_ema: 0.,
metrics_shutdown_tx,
metrics_shutdown_rx,
metrics_jh: None,
channel_info_query_tx,
};
Ok(ret)
}
@@ -499,7 +519,47 @@ impl Daemon {
taskrun::spawn(ticker);
}
pub async fn spawn_metrics(&mut self) -> Result<(), Error> {
let tx = self.tx.clone();
let daemon_stats = self.stats().clone();
let connset_cmd_tx = self.connset_ctrl.sender().clone();
let ca_conn_stats = self.connset_ctrl.ca_conn_stats().clone();
let dcom = Arc::new(netfetch::metrics::DaemonComm::new(tx.clone()));
let metrics_jh = {
let conn_set_stats = self.connset_ctrl.stats().clone();
let stats_set = StatsSet::new(
daemon_stats,
conn_set_stats,
ca_conn_stats,
self.connset_ctrl.ca_proto_stats().clone(),
self.insert_worker_stats.clone(),
self.series_by_channel_stats.clone(),
self.connset_ctrl.ioc_finder_stats().clone(),
self.opts.insert_frac.clone(),
);
let fut = netfetch::metrics::metrics_service(
self.ingest_opts.api_bind(),
dcom,
connset_cmd_tx,
stats_set,
self.metrics_shutdown_rx.clone(),
);
tokio::task::spawn(fut)
};
self.metrics_jh = Some(metrics_jh);
Ok(())
}
pub async fn daemon(mut self) -> Result<(), Error> {
{
let backend = String::new();
let (item_tx, item_rx) = async_channel::bounded(256);
let info_worker_tx = self.channel_info_query_tx.clone();
let iiq_tx = self.query_item_tx_weak.upgrade().unwrap();
let worker_fut =
netfetch::metrics::postingest::process_api_query_items(backend, item_rx, info_worker_tx, iiq_tx);
let worker_jh = taskrun::spawn(worker_fut);
}
Self::spawn_ticker(self.tx.clone(), self.stats.clone());
loop {
if self.shutting_down {
@@ -537,7 +597,12 @@ impl Daemon {
}
}
}
info!("daemon done");
info!("Wait for metrics handler");
self.metrics_shutdown_tx.send(1).await?;
if let Some(jh) = self.metrics_jh.take() {
jh.await??;
}
info!("Joined metrics handler");
Ok(())
}
}
@@ -572,10 +637,16 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option<ChannelsConfig>) ->
drop(pg);
jh.await?.map_err(Error::from_string)?;
scywr::schema::migrate_scylla_data_schema(opts.scylla_config(), 1, true, RetentionTime::Short)
scywr::schema::migrate_scylla_data_schema(opts.scylla_config(), RetentionTime::Short)
.await
.map_err(Error::from_string)?;
if let Some(scyconf) = opts.scylla_config_lt() {
scywr::schema::migrate_scylla_data_schema(scyconf, RetentionTime::Long)
.await
.map_err(Error::from_string)?;
}
info!("database check done");
// TODO use a new stats type:
@@ -600,39 +671,14 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option<ChannelsConfig>) ->
store_workers_rate,
};
let daemon = Daemon::new(opts2, opts.clone()).await?;
let tx = daemon.tx.clone();
let daemon_stats = daemon.stats().clone();
let connset_cmd_tx = daemon.connset_ctrl.sender().clone();
let ca_conn_stats = daemon.connset_ctrl.ca_conn_stats().clone();
let (metrics_shutdown_tx, metrics_shutdown_rx) = async_channel::bounded(8);
let dcom = Arc::new(netfetch::metrics::DaemonComm::new(tx.clone()));
let metrics_jh = {
let conn_set_stats = daemon.connset_ctrl.stats().clone();
let stats_set = StatsSet::new(
daemon_stats,
conn_set_stats,
ca_conn_stats,
daemon.connset_ctrl.ca_proto_stats().clone(),
daemon.insert_worker_stats.clone(),
daemon.series_by_channel_stats.clone(),
daemon.connset_ctrl.ioc_finder_stats().clone(),
insert_frac,
);
let fut =
netfetch::metrics::metrics_service(opts.api_bind(), dcom, connset_cmd_tx, stats_set, metrics_shutdown_rx);
tokio::task::spawn(fut)
};
let daemon_tx = daemon.tx.clone();
let daemon_jh = taskrun::spawn(daemon.daemon());
if let Some(channels_config) = channels_config {
debug!("will configure {} channels", channels_config.len());
let mut thr_msg = ThrottleTrace::new(Duration::from_millis(1000));
let mut i = 0;
for ch_cfg in channels_config.channels() {
match tx
match daemon_tx
.send(DaemonEvent::ChannelAdd(ch_cfg.clone(), async_channel::bounded(1).0))
.await
{
@@ -648,9 +694,6 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option<ChannelsConfig>) ->
debug!("{} configured channels applied", channels_config.len());
}
daemon_jh.await.map_err(|e| Error::with_msg_no_trace(e.to_string()))??;
info!("Daemon joined.");
metrics_shutdown_tx.send(1).await?;
metrics_jh.await.unwrap();
info!("Metrics joined.");
info!("Joined daemon");
Ok(())
}
View File
+15
View File
@@ -0,0 +1,15 @@
api_bind: 0.0.0.0:2380
backend: test
channels: channels
search:
- 127.0.0.1
postgresql:
host: 127.0.0.1
port: 5432
user: daqbuffer
pass: daqbuffer
name: daqbuffer
scylla:
hosts:
- 127.0.0.1:19042
keyspace: ks_dummy_st
+28 -21
View File
@@ -1,12 +1,13 @@
use hashbrown::HashMap;
use log::*;
use std::hash::Hash;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::OnceLock;
use std::sync::RwLock;
use std::time::Instant;
// use std::sync::atomic::AtomicU64;
// use std::sync::atomic::Ordering;
// use std::sync::RwLock;
fn tsref() -> Instant {
static C: OnceLock<Instant> = OnceLock::new();
let c = C.get_or_init(Instant::now);
@@ -23,23 +24,25 @@ fn dts_now() -> u64 {
pub struct MuCache<K, V> {
cap: usize,
map: RwLock<HashMap<K, (AtomicU64, V)>>,
map: HashMap<K, (u64, V)>,
}
impl<K: Clone + Eq + Hash, V: Clone> MuCache<K, V> {
impl<K: Eq + Hash, V> MuCache<K, V> {
pub fn new(cap: usize) -> Self {
Self {
cap,
map: RwLock::new(HashMap::with_capacity(cap)),
map: HashMap::with_capacity(cap),
}
}
pub fn insert(&self, k: K, v: V) -> Result<(), ()> {
let ts = AtomicU64::new(dts_now());
let mut map = self.map.write().unwrap();
pub fn insert(&mut self, k: K, v: V) -> Result<(), ()> {
let ts = dts_now();
// let ts = AtomicU64::new(ts);
// let mut map = self.map.write().unwrap();
let map = &mut self.map;
let nmax = self.cap * 5 / 4;
if map.len() >= nmax {
Self::remove_unused(&mut map, self.cap);
Self::remove_unused(map, self.cap);
}
if map.len() >= nmax {
warn!("no space in MuCache");
@@ -50,30 +53,34 @@ impl<K: Clone + Eq + Hash, V: Clone> MuCache<K, V> {
}
}
pub fn get(&self, k: &K) -> Option<V> {
let map = self.map.read().unwrap();
match map.get(k) {
pub fn get(&mut self, k: &K) -> Option<&V> {
// let map = self.map.read().unwrap();
let map = &mut self.map;
match map.get_mut(k) {
Some((lu, v)) => {
lu.store(dts_now(), Ordering::Release);
Some(v.clone())
let ts = dts_now();
// lu.store(ts, Ordering::Release);
*lu = ts;
Some(v)
}
None => None,
}
}
fn remove_unused(map: &mut HashMap<K, (AtomicU64, V)>, cap: usize) {
fn remove_unused(map: &mut HashMap<K, (u64, V)>, cap: usize) {
let map1 = core::mem::replace(map, HashMap::new());
let mut items: Vec<_> = map1
.into_iter()
.map(|x| (x.1 .0.load(Ordering::Acquire), x.1 .1, x.0))
.collect();
let mut items: Vec<_> = map1.into_iter().map(|x| (x.1 .0, x.1 .1, x.0)).collect();
items.sort_unstable_by_key(|x| x.0);
let ts_cut = items[items.len() - cap].0;
let map2 = items
.into_iter()
.filter(|x| x.0 > ts_cut)
.map(|x| (x.2, (AtomicU64::new(x.0), x.1)))
.map(|x| (x.2, (x.0, x.1)))
.collect();
*map = map2;
}
pub fn all_ref_mut(&mut self) -> Vec<&mut V> {
self.map.iter_mut().map(|x| (&mut x.1 .1)).collect()
}
}
+5 -3
View File
@@ -74,7 +74,7 @@ const DO_RATE_CHECK: bool = false;
#[allow(unused)]
macro_rules! trace2 {
($($arg:tt)*) => {
if false {
if true {
trace!($($arg)*);
}
};
@@ -83,7 +83,7 @@ macro_rules! trace2 {
#[allow(unused)]
macro_rules! trace3 {
($($arg:tt)*) => {
if false {
if true {
trace!($($arg)*);
}
};
@@ -92,7 +92,7 @@ macro_rules! trace3 {
#[allow(unused)]
macro_rules! trace4 {
($($arg:tt)*) => {
if false {
if true {
trace!($($arg)*);
}
};
@@ -2277,6 +2277,7 @@ impl CaConn {
FS: Fn(&Q),
{
use Poll::*;
trace3!("attempt_flush_queue id {} len {}", id, qu.len());
let mut have_progress = false;
let mut i = 0;
loop {
@@ -2298,6 +2299,7 @@ impl CaConn {
if sp.is_sending() {
match sp.poll_unpin(cx) {
Ready(Ok(())) => {
trace3!("attempt_flush_queue id {} send done", id);
have_progress = true;
}
Ready(Err(e)) => {
+11 -10
View File
@@ -174,19 +174,19 @@ pub async fn parse_config(config: PathBuf) -> Result<(CaIngestOpts, Option<Chann
let re_p = regex::Regex::new(&conf.whitelist.clone().unwrap_or("--nothing-ur9nc23ur98c--".into()))?;
let re_n = regex::Regex::new(&conf.blacklist.clone().unwrap_or("--nothing-ksm2u98rcm28--".into()))?;
let channels = if let Some(fname) = conf.channels.as_ref() {
if fname.ends_with(".txt") {
Some(parse_channel_config_txt(fname, re_p, re_n).await?)
} else if fname.ends_with(".yml") {
let e = Error::with_msg_no_trace("unsupported channe config file");
return Err(e);
} else {
let meta = tokio::fs::metadata(fname).await?;
if meta.is_dir() {
Some(parse_config_dir(&fname).await?)
let meta = tokio::fs::metadata(fname).await?;
if meta.is_file() {
if fname.ends_with(".txt") {
Some(parse_channel_config_txt(fname, re_p, re_n).await?)
} else {
let e = Error::with_msg_no_trace("unsupported channe config file");
let e = Error::with_msg_no_trace(format!("unsupported channel config file {:?}", fname));
return Err(e);
}
} else if meta.is_dir() {
Some(parse_config_dir(&fname).await?)
} else {
let e = Error::with_msg_no_trace(format!("unsupported channel config input {:?}", fname));
return Err(e);
}
} else {
None
@@ -210,6 +210,7 @@ async fn parse_config_dir(dir: &Path) -> Result<ChannelsConfig, Error> {
let buf = tokio::fs::read(e.path()).await?;
let conf: BTreeMap<String, ChannelConfigParse> =
serde_yaml::from_slice(&buf).map_err(Error::from_string)?;
info!("parsed {} channels from {}", conf.len(), fns);
ret.push_from_parsed(&conf);
} else {
debug!("ignore channel config file {:?}", e.path());
+4 -3
View File
@@ -395,8 +395,8 @@ pub async fn metrics_service(
connset_cmd_tx: Sender<CaConnSetEvent>,
stats_set: StatsSet,
shutdown_signal: Receiver<u32>,
) {
let addr = bind_to.parse().unwrap();
) -> Result<(), Error> {
let addr = bind_to.parse().map_err(Error::from_string)?;
let router = make_routes(dcom, connset_cmd_tx, stats_set).into_make_service();
axum::Server::bind(&addr)
.serve(router)
@@ -404,7 +404,8 @@ pub async fn metrics_service(
let _ = shutdown_signal.recv().await;
})
.await
.unwrap()
.map_err(Error::from_string)?;
Ok(())
}
pub async fn metrics_agg_task(local_stats: Arc<CaConnStats>, store_stats: Arc<CaConnStats>) -> Result<(), Error> {
+26 -17
View File
@@ -3,6 +3,7 @@ use async_channel::Sender;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use mrucache::mucache::MuCache;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsNano;
@@ -22,8 +23,8 @@ pub enum Error {
SendError,
}
impl From<async_channel::SendError<QueryItem>> for Error {
fn from(value: async_channel::SendError<QueryItem>) -> Self {
impl From<async_channel::SendError<VecDeque<QueryItem>>> for Error {
fn from(value: async_channel::SendError<VecDeque<QueryItem>>) -> Self {
Error::SendError
}
}
@@ -35,12 +36,18 @@ pub struct EventValueItem {
val: DataValue,
}
async fn process_api_query_items(
struct SeriesWriterIngredients {
writer: SeriesWriter,
}
pub async fn process_api_query_items(
backend: String,
item_rx: Receiver<EventValueItem>,
info_worker_tx: Sender<ChannelInfoQuery>,
iiq_tx: Sender<QueryItem>,
iiq_tx: Sender<VecDeque<QueryItem>>,
) -> Result<(), Error> {
// TODO so far arbitrary upper limit on the number of ad-hoc channels:
let mut mucache: MuCache<String, SeriesWriter> = MuCache::new(2000);
let mut item_qu = VecDeque::new();
let mut sw_tick_last = Instant::now();
@@ -49,7 +56,7 @@ async fn process_api_query_items(
let tsnow = Instant::now();
if tsnow.saturating_duration_since(sw_tick_last) >= Duration::from_millis(5000) {
sw_tick_last = tsnow;
tick_writers(&mut item_qu)?;
tick_writers(mucache.all_ref_mut(), &mut item_qu)?;
}
let item = match item {
Ok(Ok(item)) => item,
@@ -77,21 +84,23 @@ async fn process_api_query_items(
let sw = &mut sw;
sw.write(item.ts, item.ts, item.val, &mut item_qu)?;
for e in item_qu.drain(..).into_iter() {
iiq_tx.send(e).await?;
}
let item = core::mem::replace(&mut item_qu, VecDeque::new());
iiq_tx.send(item).await?;
}
// let scalar_type = ScalarType::F32;
// let shape = Shape::Scalar;
// TODO SeriesWriter need to get ticked.
finish_writers(mucache.all_ref_mut(), &mut item_qu)?;
Ok(())
}
fn tick_writers(iiq: &mut VecDeque<QueryItem>) -> Result<(), Error> {
let sw: &mut SeriesWriter = err::todoval();
sw.tick(iiq)?;
fn tick_writers(sws: Vec<&mut SeriesWriter>, iiq: &mut VecDeque<QueryItem>) -> Result<(), Error> {
for sw in sws {
sw.tick(iiq)?;
}
Ok(())
}
fn finish_writers(sws: Vec<&mut SeriesWriter>, iiq: &mut VecDeque<QueryItem>) -> Result<(), Error> {
for sw in sws {
sw.tick(iiq)?;
}
Ok(())
}
+10 -3
View File
@@ -4,18 +4,21 @@ use serde::Deserialize;
pub struct ScyllaIngestConfig {
hosts: Vec<String>,
keyspace: String,
keyspace_rf1: Option<String>,
}
impl ScyllaIngestConfig {
pub fn new<I, H, K>(hosts: I, ks: K) -> Self
pub fn new<I, H, K1, K2>(hosts: I, ks_rf3: K1, ks_rf1: K2) -> Self
where
I: IntoIterator<Item = H>,
H: Into<String>,
K: Into<String>,
K1: Into<String>,
K2: Into<String>,
{
Self {
hosts: hosts.into_iter().map(Into::into).collect(),
keyspace: ks.into(),
keyspace: ks_rf3.into(),
keyspace_rf1: Some(ks_rf1.into()),
}
}
@@ -26,4 +29,8 @@ impl ScyllaIngestConfig {
pub fn keyspace(&self) -> &String {
&self.keyspace
}
pub fn keyspace_rf1(&self) -> Option<&String> {
self.keyspace_rf1.as_ref()
}
}
+134 -76
View File
@@ -16,6 +16,9 @@ use async_channel::Receiver;
use atomic::AtomicU64;
use atomic::Ordering;
use err::Error;
use futures_util::Future;
use futures_util::Stream;
use futures_util::StreamExt;
use log::*;
use netpod::ttl::RetentionTime;
use netpod::TsMs;
@@ -125,6 +128,29 @@ pub async fn spawn_scylla_insert_workers(
data_store,
store_stats.clone(),
));
let jh = tokio::spawn(worker_streamed(
worker_ix,
insert_worker_concurrency,
item_inp.clone(),
insert_worker_opts.clone(),
Some(data_store),
store_stats.clone(),
));
jhs.push(jh);
}
Ok(jhs)
}
pub async fn spawn_scylla_insert_workers_dummy(
insert_worker_count: usize,
insert_worker_concurrency: usize,
item_inp: Receiver<VecDeque<QueryItem>>,
insert_worker_opts: Arc<InsertWorkerOpts>,
store_stats: Arc<stats::InsertWorkerStats>,
) -> Result<Vec<JoinHandle<Result<(), Error>>>, Error> {
let mut jhs = Vec::new();
for worker_ix in 0..insert_worker_count {
let data_store = None;
let jh = tokio::spawn(worker_streamed(
worker_ix,
insert_worker_concurrency,
@@ -223,92 +249,54 @@ async fn worker_streamed(
concurrency: usize,
item_inp: Receiver<VecDeque<QueryItem>>,
insert_worker_opts: Arc<InsertWorkerOpts>,
data_store: Arc<DataStore>,
data_store: Option<Arc<DataStore>>,
stats: Arc<InsertWorkerStats>,
) -> Result<(), Error> {
use futures_util::StreamExt;
trace!("worker_streamed begin");
stats.worker_start().inc();
insert_worker_opts
.insert_workers_running
.fetch_add(1, atomic::Ordering::AcqRel);
// TODO possible without box?
let item_inp = Box::pin(item_inp);
let mut stream = item_inp
.map(|batch| {
stats.item_recv.inc();
let tsnow = TsMs::from_system_time(SystemTime::now());
let mut res = Vec::with_capacity(32);
for item in batch {
if false {
match &item {
QueryItem::ConnectionStatus(_) => {
debug!("execute ConnectionStatus");
}
QueryItem::ChannelStatus(_) => {
debug!("execute ChannelStatus");
}
QueryItem::Insert(item) => {
debug!(
"execute Insert {:?} {:?} {:?}",
item.series,
item.ts_msp,
item.val.shape()
);
}
QueryItem::TimeBinSimpleF32(_) => {
debug!("execute TimeBinSimpleF32");
}
QueryItem::Accounting(_) => {
debug!("execute Accounting");
}
}
let stream = item_inp;
let stream = inspect_items(stream);
if let Some(data_store) = data_store {
let stream = transform_to_db_futures(stream, data_store, stats.clone());
let stream = stream
.map(|x| futures_util::stream::iter(x))
.flatten_unordered(Some(1))
// .map(|x| async move {
// drop(x);
// Ok(())
// })
.buffer_unordered(concurrency);
let mut stream = Box::pin(stream);
while let Some(item) = stream.next().await {
match item {
Ok(_) => {
stats.inserted_values().inc();
// TODO compute the insert latency bin and count.
}
let futs = match item {
QueryItem::Insert(item) => prepare_query_insert_futs(item, &data_store, &stats, tsnow),
QueryItem::ConnectionStatus(item) => {
stats.inserted_connection_status().inc();
let fut = insert_connection_status_fut(item, &data_store, stats.clone());
smallvec![fut]
}
QueryItem::ChannelStatus(item) => {
stats.inserted_channel_status().inc();
insert_channel_status_fut(item, &data_store, stats.clone())
}
QueryItem::TimeBinSimpleF32(item) => prepare_timebin_insert_futs(item, &data_store, &stats, tsnow),
QueryItem::Accounting(item) => prepare_accounting_insert_futs(item, &data_store, &stats, tsnow),
};
res.extend(futs.into_iter());
}
res
})
.map(|x| futures_util::stream::iter(x))
.flatten_unordered(Some(1))
// .map(|x| async move {
// drop(x);
// Ok(())
// })
.buffer_unordered(concurrency);
while let Some(item) = stream.next().await {
match item {
Ok(_) => {
stats.inserted_values().inc();
// TODO compute the insert latency bin and count.
}
Err(e) => {
use scylla::transport::errors::QueryError;
let e = match e {
QueryError::TimeoutError => crate::iteminsertqueue::Error::DbTimeout,
// TODO use `msg`
QueryError::DbError(e, _msg) => match e {
scylla::transport::errors::DbError::Overloaded => crate::iteminsertqueue::Error::DbOverload,
Err(e) => {
use scylla::transport::errors::QueryError;
let e = match e {
QueryError::TimeoutError => crate::iteminsertqueue::Error::DbTimeout,
// TODO use `msg`
QueryError::DbError(e, _msg) => match e {
scylla::transport::errors::DbError::Overloaded => crate::iteminsertqueue::Error::DbOverload,
_ => e.into(),
},
_ => e.into(),
},
_ => e.into(),
};
stats_inc_for_err(&stats, &e);
};
stats_inc_for_err(&stats, &e);
}
}
}
}
} else {
let mut stream = Box::pin(stream);
while let Some(item) = stream.next().await {
drop(item);
}
};
stats.worker_finish().inc();
insert_worker_opts
.insert_workers_running
@@ -317,6 +305,76 @@ async fn worker_streamed(
Ok(())
}
fn transform_to_db_futures<S>(
item_inp: S,
data_store: Arc<DataStore>,
stats: Arc<InsertWorkerStats>,
) -> impl Stream<Item = Vec<InsertFut>>
where
S: Stream<Item = VecDeque<QueryItem>>,
{
trace!("transform_to_db_futures begin");
// TODO possible without box?
// let item_inp = Box::pin(item_inp);
item_inp.map(move |batch| {
stats.item_recv.inc();
trace!("transform_to_db_futures have batch len {}", batch.len());
let tsnow = TsMs::from_system_time(SystemTime::now());
let mut res = Vec::with_capacity(32);
for item in batch {
let futs = match item {
QueryItem::Insert(item) => prepare_query_insert_futs(item, &data_store, &stats, tsnow),
QueryItem::ConnectionStatus(item) => {
stats.inserted_connection_status().inc();
let fut = insert_connection_status_fut(item, &data_store, stats.clone());
smallvec![fut]
}
QueryItem::ChannelStatus(item) => {
stats.inserted_channel_status().inc();
insert_channel_status_fut(item, &data_store, stats.clone())
}
QueryItem::TimeBinSimpleF32(item) => prepare_timebin_insert_futs(item, &data_store, &stats, tsnow),
QueryItem::Accounting(item) => prepare_accounting_insert_futs(item, &data_store, &stats, tsnow),
};
trace!("prepared futs len {}", futs.len());
res.extend(futs.into_iter());
}
res
})
}
fn inspect_items(item_inp: Receiver<VecDeque<QueryItem>>) -> impl Stream<Item = VecDeque<QueryItem>> {
trace!("transform_to_db_futures begin");
// TODO possible without box?
// let item_inp = Box::pin(item_inp);
item_inp.inspect(move |batch| {
for item in batch {
match &item {
QueryItem::ConnectionStatus(_) => {
debug!("execute ConnectionStatus");
}
QueryItem::ChannelStatus(_) => {
debug!("execute ChannelStatus");
}
QueryItem::Insert(item) => {
debug!(
"execute Insert {:?} {:?} {:?}",
item.series,
item.ts_msp,
item.val.shape()
);
}
QueryItem::TimeBinSimpleF32(_) => {
debug!("execute TimeBinSimpleF32");
}
QueryItem::Accounting(_) => {
debug!("execute Accounting");
}
}
}
})
}
fn prepare_query_insert_futs(
item: InsertItem,
data_store: &Arc<DataStore>,
+23 -6
View File
@@ -447,16 +447,14 @@ async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySessio
Ok(())
}
pub async fn migrate_scylla_data_schema(
scyconf: &ScyllaIngestConfig,
replication: u32,
durable: bool,
rett: RetentionTime,
) -> Result<(), Error> {
pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: RetentionTime) -> Result<(), Error> {
let scy2 = create_session_no_ks(scyconf).await?;
let scy = &scy2;
let durable = true;
if !has_keyspace(scyconf.keyspace(), scy).await? {
// TODO
let replication = 3;
let cql = format!(
concat!(
"create keyspace {}",
@@ -472,6 +470,25 @@ pub async fn migrate_scylla_data_schema(
info!("keyspace created");
}
if let Some(ks) = scyconf.keyspace_rf1() {
if !has_keyspace(ks, scy).await? {
let replication = 1;
let cql = format!(
concat!(
"create keyspace {}",
" with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }}",
" and durable_writes = {};"
),
scyconf.keyspace(),
replication,
durable
);
info!("scylla create keyspace {cql}");
scy.query_iter(cql, ()).await?;
info!("keyspace created");
}
}
let ks = scyconf.keyspace();
scy.use_keyspace(ks, true).await?;
+2 -2
View File
@@ -329,10 +329,10 @@ fn write_00() {
user: "daqbuffer".into(),
pass: "daqbuffer".into(),
};
let scyconf = &ScyllaIngestConfig::new(["127.0.0.1:19042"], "daqingest_test_00");
let scyconf = &ScyllaIngestConfig::new(["127.0.0.1:19042"], "daqingest_test_00_rf3", "daqingest_test_00_rf1");
let (pgc, pg_jh) = dbpg::conn::make_pg_client(dbconf).await?;
dbpg::schema::schema_check(&pgc).await?;
scywr::schema::migrate_scylla_data_schema(scyconf, 1, true, netpod::ttl::RetentionTime::Short).await?;
scywr::schema::migrate_scylla_data_schema(scyconf, netpod::ttl::RetentionTime::Short).await?;
let scy = scywr::session::create_session(scyconf).await?;
let stats = SeriesByChannelStats::new();
let stats = Arc::new(stats);