Use more configured ttl

This commit is contained in:
Dominik Werder
2022-12-13 15:19:05 +01:00
parent b5c1039c78
commit 44364352b1
14 changed files with 2890 additions and 276 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -2,6 +2,7 @@ use clap::Parser;
use daqingest::{ChannelAccess, DaqIngestOpts, SubCmd};
use err::Error;
use log::*;
use netfetch::conf::parse_config;
pub fn main() -> Result<(), Error> {
let opts = DaqIngestOpts::parse();
@@ -21,10 +22,13 @@ pub fn main() -> Result<(), Error> {
}
SubCmd::ChannelAccess(k) => match k {
ChannelAccess::CaSearch(k) => {
let opts = daqingest::CaConfig { config: k.config }.into();
netfetch::ca::search::ca_search(opts).await?
let (conf, channels) = parse_config(k.config.into()).await?;
netfetch::ca::search::ca_search(conf, &channels).await?
}
ChannelAccess::CaIngest(k) => {
let (conf, channels) = parse_config(k.config.into()).await?;
netfetch::ca::ca_connect(conf, &channels).await?
}
ChannelAccess::CaIngest(k) => netfetch::ca::ca_connect(k.into()).await?,
},
}
Ok(())

View File

@@ -2,7 +2,6 @@ pub mod query;
use clap::ArgAction::Count;
use clap::Parser;
use netfetch::ca::ListenFromFileOpts;
use netfetch::zmtp::ZmtpClientOpts;
#[derive(Debug, Parser)]
@@ -94,11 +93,3 @@ pub struct CaSearch {
pub struct CaConfig {
pub config: String,
}
impl From<CaConfig> for ListenFromFileOpts {
fn from(k: CaConfig) -> Self {
Self {
config: k.config.into(),
}
}
}

View File

@@ -15,10 +15,9 @@ serde_yaml = "0.8.23"
tokio = { version = "1.23.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs", "tracing"] }
tokio-stream = { version = "0.1", features = ["fs"]}
async-channel = "1.6"
bytes = "1.0"
bytes = "1.3"
arrayref = "0.3"
byteorder = "1.4"
futures-core = "0.3"
futures-util = "0.3"
#pin-project-lite = "0.2"
scylla = "0.4"
@@ -26,7 +25,7 @@ tokio-postgres = "0.7.6"
md-5 = "0.9"
hex = "0.4"
libc = "0.2"
regex = "1.5.5"
regex = "1.7.0"
axum = "0.5"
http = "0.2"
url = "2.2"
@@ -34,11 +33,11 @@ hyper = "0.14"
chrono = "0.4"
humantime = "2.1"
humantime-serde = "1.1"
pin-project = "1"
lazy_static = "1"
log = { path = "../log" }
stats = { path = "../stats" }
err = { path = "../../daqbuffer/err" }
netpod = { path = "../../daqbuffer/netpod" }
taskrun = { path = "../../daqbuffer/taskrun" }
bitshuffle = { path = "../../daqbuffer/bitshuffle" }
pin-project = "1"
lazy_static = "1"

View File

@@ -8,27 +8,23 @@ pub mod store;
use self::store::DataStore;
use crate::ca::conn::ConnCommand;
use crate::ca::connset::CaConnSet;
use crate::conf::CaIngestOpts;
use crate::errconv::ErrConv;
use crate::insertworker::spawn_scylla_insert_workers;
use crate::linuxhelper::local_hostname;
use crate::metrics::metrics_agg_task;
use crate::metrics::{metrics_agg_task, ExtraInsertsConf};
use crate::rt::TokMx;
use crate::store::CommonInsertItemQueue;
use err::Error;
use futures_util::stream::FuturesUnordered;
use futures_util::{FutureExt, StreamExt};
use log::*;
use netpod::{Database, ScyllaConfig};
use serde::{Deserialize, Serialize};
use netpod::Database;
use stats::{CaConnStats, CaConnStatsAgg};
use std::collections::BTreeMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
use std::path::PathBuf;
use std::net::{SocketAddr, SocketAddrV4};
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::fs::OpenOptions;
use tokio::io::AsyncReadExt;
use tokio_postgres::Client as PgClient;
pub static SIGINT: AtomicU32 = AtomicU32::new(0);
@@ -37,186 +33,6 @@ lazy_static::lazy_static! {
pub static ref METRICS: Mutex<Option<CaConnStatsAgg>> = Mutex::new(None);
}
#[derive(Debug, Serialize, Deserialize)]
struct ChannelConfig {
backend: String,
channels: Vec<String>,
search: Vec<String>,
#[serde(default)]
search_blacklist: Vec<String>,
#[serde(default)]
tmp_remove: Vec<String>,
addr_bind: Option<IpAddr>,
addr_conn: Option<IpAddr>,
whitelist: Option<String>,
blacklist: Option<String>,
max_simul: Option<usize>,
#[serde(with = "humantime_serde")]
timeout: Option<Duration>,
postgresql: Database,
scylla: ScyllaConfig,
array_truncate: Option<usize>,
insert_worker_count: Option<usize>,
insert_scylla_sessions: Option<usize>,
insert_queue_max: Option<usize>,
insert_item_queue_cap: Option<usize>,
api_bind: Option<String>,
local_epics_hostname: Option<String>,
store_workers_rate: Option<u64>,
insert_frac: Option<u64>,
use_rate_limit_queue: Option<bool>,
#[serde(with = "humantime_serde")]
ttl_index: Option<Duration>,
#[serde(with = "humantime_serde")]
ttl_d0: Option<Duration>,
#[serde(with = "humantime_serde")]
ttl_d1: Option<Duration>,
}
#[test]
fn parse_config_minimal() {
let conf = r###"
backend: scylla
ttl_d1: 10m 3s
api_bind: "0.0.0.0:3011"
channels:
- CHANNEL-1:A
- CHANNEL-1:B
- CHANNEL-2:A
search:
- 172.26.0.255
- 172.26.2.255
postgresql:
host: host.example.com
port: 5432
user: USER
pass: PASS
name: NAME
scylla:
hosts:
- sf-nube-11:19042
- sf-nube-12:19042
keyspace: ks1
"###;
let res: Result<ChannelConfig, _> = serde_yaml::from_slice(conf.as_bytes());
assert_eq!(res.is_ok(), true);
let conf = res.unwrap();
assert_eq!(conf.api_bind, Some("0.0.0.0:3011".to_string()));
assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string()));
assert_eq!(conf.scylla.hosts.get(1), Some(&"sf-nube-12:19042".to_string()));
assert_eq!(conf.ttl_d1, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45)));
}
#[test]
fn test_duration_parse() {
#[derive(Serialize, Deserialize)]
struct A {
#[serde(with = "humantime_serde")]
dur: Duration,
}
let a = A {
dur: Duration::from_millis(12000),
};
let s = serde_json::to_string(&a).unwrap();
assert_eq!(s, r#"{"dur":"12s"}"#);
let a = A {
dur: Duration::from_millis(12012),
};
let s = serde_json::to_string(&a).unwrap();
assert_eq!(s, r#"{"dur":"12s 12ms"}"#);
let a: A = serde_json::from_str(r#"{"dur":"3s170ms"}"#).unwrap();
assert_eq!(a.dur, Duration::from_millis(3170));
}
pub struct ListenFromFileOpts {
pub config: PathBuf,
}
pub async fn parse_config(config: PathBuf) -> Result<CaConnectOpts, Error> {
let mut file = OpenOptions::new().read(true).open(config).await?;
let mut buf = Vec::new();
file.read_to_end(&mut buf).await?;
let mut conf: ChannelConfig =
serde_yaml::from_slice(&buf).map_err(|e| Error::with_msg_no_trace(format!("{:?}", e)))?;
let re_p = regex::Regex::new(&conf.whitelist.unwrap_or("--nothing-whitelisted--".into()))?;
let re_n = regex::Regex::new(&conf.blacklist.unwrap_or("--nothing-blacklisted--".into()))?;
conf.channels = conf
.channels
.into_iter()
.filter(|ch| {
if let Some(_cs) = re_p.captures(&ch) {
true
} else if re_n.is_match(&ch) {
false
} else {
true
}
})
.collect();
Ok(CaConnectOpts {
backend: conf.backend,
channels: conf.channels,
search: conf.search,
search_blacklist: conf.search_blacklist,
addr_bind: conf.addr_bind.unwrap_or(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))),
addr_conn: conf.addr_conn.unwrap_or(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255))),
timeout: conf.timeout.unwrap_or(Duration::from_millis(1200)),
pgconf: conf.postgresql,
scyconf: conf.scylla,
array_truncate: conf.array_truncate.unwrap_or(512),
insert_worker_count: conf.insert_worker_count.unwrap_or(800),
insert_scylla_sessions: conf.insert_scylla_sessions.unwrap_or(1),
insert_queue_max: conf.insert_queue_max.unwrap_or(64),
insert_item_queue_cap: conf.insert_item_queue_cap.unwrap_or(200000),
api_bind: conf.api_bind.unwrap_or_else(|| "0.0.0.0:3011".into()),
local_epics_hostname: conf.local_epics_hostname.unwrap_or_else(local_hostname),
store_workers_rate: conf.store_workers_rate.unwrap_or(10000),
insert_frac: conf.insert_frac.unwrap_or(1000),
use_rate_limit_queue: conf.use_rate_limit_queue.unwrap_or(false),
ttl_index: conf.ttl_index.unwrap_or(Duration::from_secs(60 * 60 * 24 * 3)),
ttl_d0: conf.ttl_d0.unwrap_or(Duration::from_secs(60 * 60 * 24 * 1)),
ttl_d1: conf.ttl_d1.unwrap_or(Duration::from_secs(60 * 60 * 12)),
})
}
// TODO (low-prio) could remove usage of clone to avoid clone of large channel list.
#[derive(Clone)]
pub struct CaConnectOpts {
pub backend: String,
pub channels: Vec<String>,
pub search: Vec<String>,
pub search_blacklist: Vec<String>,
pub addr_bind: IpAddr,
pub addr_conn: IpAddr,
pub timeout: Duration,
pub pgconf: Database,
pub scyconf: ScyllaConfig,
pub array_truncate: usize,
pub insert_worker_count: usize,
pub insert_scylla_sessions: usize,
pub insert_queue_max: usize,
pub insert_item_queue_cap: usize,
pub api_bind: String,
pub local_epics_hostname: String,
pub store_workers_rate: u64,
pub insert_frac: u64,
pub use_rate_limit_queue: bool,
pub ttl_index: Duration,
pub ttl_d0: Duration,
pub ttl_d1: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExtraInsertsConf {
pub copies: Vec<(u64, u64)>,
}
impl ExtraInsertsConf {
pub fn new() -> Self {
Self { copies: Vec::new() }
}
}
pub struct IngestCommons {
pub pgconf: Arc<Database>,
pub backend: String,
@@ -321,13 +137,12 @@ async fn query_addr_multiple(pg_client: &PgClient) -> Result<(), Error> {
Ok(())
}
pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
pub async fn ca_connect(opts: CaIngestOpts, channels: &Vec<String>) -> Result<(), Error> {
crate::linuxhelper::set_signal_handler()?;
let extra_inserts_conf = TokMx::new(ExtraInsertsConf { copies: Vec::new() });
let insert_ivl_min = Arc::new(AtomicU64::new(8800));
let opts = parse_config(opts.config).await?;
let scyconf = opts.scyconf.clone();
let pgconf = opts.pgconf.clone();
let scyconf = opts.scylla().clone();
let pgconf = opts.postgresql().clone();
let d = &pgconf;
let (pg_client, pg_conn) = tokio_postgres::connect(
&format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name),
@@ -365,19 +180,19 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
let mut channels_by_host = BTreeMap::new();
let data_store = Arc::new(DataStore::new(&scyconf, pg_client.clone()).await?);
let insert_item_queue = CommonInsertItemQueue::new(opts.insert_item_queue_cap);
let insert_item_queue = CommonInsertItemQueue::new(opts.insert_item_queue_cap());
let insert_item_queue = Arc::new(insert_item_queue);
let ingest_commons = IngestCommons {
pgconf: Arc::new(pgconf.clone()),
backend: opts.backend.clone(),
local_epics_hostname: opts.local_epics_hostname.clone(),
backend: opts.backend().into(),
local_epics_hostname: opts.local_epics_hostname().clone(),
insert_item_queue: insert_item_queue.clone(),
data_store: data_store.clone(),
insert_ivl_min: insert_ivl_min.clone(),
extra_inserts_conf,
store_workers_rate: AtomicU64::new(opts.store_workers_rate),
insert_frac: AtomicU64::new(opts.insert_frac),
store_workers_rate: AtomicU64::new(opts.store_workers_rate()),
insert_frac: AtomicU64::new(opts.insert_frac()),
ca_conn_set: CaConnSet::new(),
};
let ingest_commons = Arc::new(ingest_commons);
@@ -385,21 +200,21 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
// TODO use a new stats type:
let store_stats = Arc::new(CaConnStats::new());
let jh_insert_workers = spawn_scylla_insert_workers(
opts.scyconf.clone(),
opts.insert_scylla_sessions,
opts.insert_worker_count,
opts.scylla().clone(),
opts.insert_scylla_sessions(),
opts.insert_worker_count(),
insert_item_queue.clone(),
ingest_commons.clone(),
pg_client.clone(),
store_stats.clone(),
opts.use_rate_limit_queue,
opts.use_rate_limit_queue(),
opts.clone(),
)
.await?;
if true {
tokio::spawn(crate::metrics::start_metrics_service(
opts.api_bind.clone(),
opts.api_bind().clone(),
ingest_commons.clone(),
));
}
@@ -407,7 +222,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
let metrics_agg_fut = metrics_agg_task(ingest_commons.clone(), local_stats.clone(), store_stats.clone());
let metrics_agg_jh = tokio::spawn(metrics_agg_fut);
let mut chns_todo = &opts.channels[..];
let mut chns_todo = &channels[..];
let mut ix = 0;
for ch in chns_todo {
if SIGINT.load(Ordering::Acquire) != 0 {
@@ -424,7 +239,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
ingest_commons
.ca_conn_set
.add_channel_to_addr(
opts.backend.clone(),
opts.backend().into(),
SocketAddr::V4(addr.clone()),
ch.clone(),
ingest_commons.clone(),
@@ -433,7 +248,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
}
ix += 1;
if ix % 1000 == 0 {
info!("{} of {} {}", ix, opts.channels.len(), ch);
info!("{} of {} {}", ix, channels.len(), ch);
}
}
info!("channels_by_host len {}", channels_by_host.len());

View File

@@ -1,9 +1,8 @@
use crate::ca::findioc::FindIocStream;
use crate::ca::{parse_config, ListenFromFileOpts};
use crate::conf::CaIngestOpts;
use err::Error;
use futures_util::StreamExt;
use log::*;
use netpod::Database;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -45,16 +44,9 @@ async fn resolve_address(addr_str: &str) -> Result<SocketAddr, Error> {
Ok(ac)
}
pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
pub async fn ca_search(opts: CaIngestOpts, channels: &Vec<String>) -> Result<(), Error> {
info!("ca_search begin");
let opts = parse_config(opts.config).await?;
let d = Database {
name: opts.pgconf.name.clone(),
host: opts.pgconf.host.clone(),
port: opts.pgconf.port.clone(),
user: opts.pgconf.user.clone(),
pass: opts.pgconf.pass.clone(),
};
let d = opts.postgresql().clone();
let (pg_client, pg_conn) = tokio_postgres::connect(
&format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name),
tokio_postgres::tls::NoTls,
@@ -75,7 +67,7 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
.unwrap()
};
let mut addrs = Vec::new();
for s in &opts.search {
for s in opts.search() {
match resolve_address(s).await {
Ok(addr) => {
info!("resolved {s} as {addr}");
@@ -88,7 +80,7 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
}
let gw_addrs = {
let mut gw_addrs = Vec::new();
for s in &opts.search_blacklist {
for s in opts.search_blacklist() {
match resolve_address(s).await {
Ok(addr) => {
info!("resolved {s} as {addr}");
@@ -113,7 +105,7 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
})
.collect();
let mut finder = FindIocStream::new(addrs);
for ch in &opts.channels {
for ch in channels.iter() {
finder.push(ch.into());
}
let mut ts_last = Instant::now();
@@ -191,7 +183,7 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
pg_client
.execute(
&qu_insert,
&[&opts.backend, &item.channel, &queryaddr, &responseaddr, &addr],
&[&opts.backend(), &item.channel, &queryaddr, &responseaddr, &addr],
)
.await
.unwrap();

View File

@@ -81,7 +81,7 @@ impl DataStore {
let qu_insert_ts_msp = Arc::new(q);
let q = scy
.prepare(
"insert into series_by_ts_msp (part, ts_msp, shape_kind, scalar_type, series) values (?, ?, ?, ?, ?)",
"insert into series_by_ts_msp (part, ts_msp, shape_kind, scalar_type, series) values (?, ?, ?, ?, ?) using ttl ?",
)
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
@@ -121,59 +121,63 @@ impl DataStore {
// array
let q = scy
.prepare("insert into events_array_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.prepare(
"insert into events_array_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?",
)
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_array_i8 = Arc::new(q);
let q = scy
.prepare("insert into events_array_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.prepare("insert into events_array_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_array_i16 = Arc::new(q);
let q = scy
.prepare("insert into events_array_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.prepare("insert into events_array_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_array_i32 = Arc::new(q);
let q = scy
.prepare("insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.prepare("insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_array_f32 = Arc::new(q);
let q = scy
.prepare("insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.prepare("insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_array_f64 = Arc::new(q);
// Others:
let q = scy
.prepare("insert into muted (part, series, ts, ema, emd) values (?, ?, ?, ?, ?)")
.prepare("insert into muted (part, series, ts, ema, emd) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_muted = Arc::new(q);
let q = scy
.prepare("insert into item_recv_ivl (part, series, ts, ema, emd) values (?, ?, ?, ?, ?)")
.prepare("insert into item_recv_ivl (part, series, ts, ema, emd) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_item_recv_ivl = Arc::new(q);
// Connection status:
let q = scy
.prepare("insert into connection_status (ts_msp, ts_lsp, kind, addr) values (?, ?, ?, ?)")
.prepare("insert into connection_status (ts_msp, ts_lsp, kind, addr) values (?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_connection_status = Arc::new(q);
let q = scy
.prepare("insert into channel_status (series, ts_msp, ts_lsp, kind) values (?, ?, ?, ?)")
.prepare("insert into channel_status (series, ts_msp, ts_lsp, kind) values (?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_channel_status = Arc::new(q);
let q = scy
.prepare("insert into channel_status_by_ts_msp (ts_msp, ts_lsp, series, kind) values (?, ?, ?, ?)")
.prepare(
"insert into channel_status_by_ts_msp (ts_msp, ts_lsp, series, kind) values (?, ?, ?, ?) using ttl ?",
)
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_channel_status_by_ts_msp = Arc::new(q);
let q = scy
.prepare("insert into channel_ping (part, ts_msp, series, ivl, interest, evsize) values (?, ?, ?, ?, ?, ?)")
.prepare("insert into channel_ping (part, ts_msp, series, ivl, interest, evsize) values (?, ?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_channel_ping = Arc::new(q);

View File

@@ -1,8 +1,7 @@
use crate::errconv::ErrConv;
use crate::zmtp::{CommonQueries, ZmtpFrame};
use err::Error;
use futures_core::Future;
use futures_util::FutureExt;
use futures_util::{Future, FutureExt};
use log::*;
use netpod::timeunits::SEC;
use netpod::{ByteOrder, ScalarType, Shape};

232
netfetch/src/conf.rs Normal file
View File

@@ -0,0 +1,232 @@
use crate::linuxhelper::local_hostname;
use err::Error;
use netpod::log::*;
use netpod::Database;
use netpod::ScyllaConfig;
use serde::Deserialize;
use serde::Serialize;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::path::PathBuf;
use std::time::Duration;
use tokio::fs::OpenOptions;
use tokio::io::AsyncReadExt;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CaIngestOpts {
backend: String,
channels: PathBuf,
api_bind: Option<String>,
search: Vec<String>,
#[serde(default)]
search_blacklist: Vec<String>,
#[serde(default)]
tmp_remove: Vec<String>,
addr_bind: Option<IpAddr>,
addr_conn: Option<IpAddr>,
whitelist: Option<String>,
blacklist: Option<String>,
max_simul: Option<usize>,
#[serde(with = "humantime_serde")]
timeout: Option<Duration>,
postgresql: Database,
scylla: ScyllaConfig,
array_truncate: Option<usize>,
insert_worker_count: Option<usize>,
insert_scylla_sessions: Option<usize>,
insert_queue_max: Option<usize>,
insert_item_queue_cap: Option<usize>,
local_epics_hostname: Option<String>,
store_workers_rate: Option<u64>,
insert_frac: Option<u64>,
use_rate_limit_queue: Option<bool>,
#[serde(with = "humantime_serde")]
ttl_index: Option<Duration>,
#[serde(with = "humantime_serde")]
ttl_d0: Option<Duration>,
#[serde(with = "humantime_serde")]
ttl_d1: Option<Duration>,
}
impl CaIngestOpts {
pub fn backend(&self) -> &str {
&self.backend
}
pub fn addr_bind(&self) -> IpAddr {
self.addr_bind
.clone()
.unwrap_or_else(|| IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))
}
pub fn addr_conn(&self) -> IpAddr {
self.addr_conn
.clone()
.unwrap_or_else(|| IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)))
}
pub fn api_bind(&self) -> String {
self.api_bind.clone().unwrap_or_else(|| "0.0.0.0:3011".into())
}
pub fn postgresql(&self) -> &Database {
&self.postgresql
}
pub fn scylla(&self) -> &ScyllaConfig {
&self.scylla
}
pub fn search(&self) -> &Vec<String> {
&self.search
}
pub fn search_blacklist(&self) -> &Vec<String> {
&self.search_blacklist
}
pub fn timeout(&self) -> Duration {
Duration::from_millis(1200)
}
pub fn insert_worker_count(&self) -> usize {
self.insert_worker_count.unwrap_or(800)
}
pub fn insert_scylla_sessions(&self) -> usize {
self.insert_scylla_sessions.unwrap_or(1)
}
pub fn insert_queue_max(&self) -> usize {
self.insert_queue_max.unwrap_or(64)
}
pub fn array_truncate(&self) -> usize {
self.array_truncate.unwrap_or(512)
}
pub fn insert_item_queue_cap(&self) -> usize {
self.insert_item_queue_cap.unwrap_or(80000)
}
pub fn local_epics_hostname(&self) -> String {
self.local_epics_hostname.clone().unwrap_or_else(local_hostname)
}
pub fn store_workers_rate(&self) -> u64 {
self.store_workers_rate.unwrap_or(5000)
}
pub fn insert_frac(&self) -> u64 {
self.insert_frac.unwrap_or(1000)
}
pub fn use_rate_limit_queue(&self) -> bool {
self.use_rate_limit_queue.unwrap_or(false)
}
pub fn ttl_index(&self) -> Duration {
self.ttl_index
.clone()
.unwrap_or_else(|| Duration::from_secs(60 * 60 * 24 * 3))
}
pub fn ttl_d0(&self) -> Duration {
self.ttl_d0
.clone()
.unwrap_or_else(|| Duration::from_secs(60 * 60 * 24 * 1))
}
pub fn ttl_d1(&self) -> Duration {
self.ttl_d1.clone().unwrap_or_else(|| Duration::from_secs(60 * 60 * 12))
}
}
#[test]
fn parse_config_minimal() {
let conf = r###"
backend: scylla
ttl_d1: 10m 3s
api_bind: "0.0.0.0:3011"
channels: /some/path/file.txt
search:
- 172.26.0.255
- 172.26.2.255
postgresql:
host: host.example.com
port: 5432
user: USER
pass: PASS
name: NAME
scylla:
hosts:
- sf-nube-11:19042
- sf-nube-12:19042
keyspace: ks1
"###;
let res: Result<CaIngestOpts, _> = serde_yaml::from_slice(conf.as_bytes());
assert_eq!(res.is_ok(), true);
let conf = res.unwrap();
assert_eq!(conf.channels, PathBuf::from("/some/path/file.txt"));
assert_eq!(conf.api_bind, Some("0.0.0.0:3011".to_string()));
assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string()));
assert_eq!(conf.scylla.hosts.get(1), Some(&"sf-nube-12:19042".to_string()));
assert_eq!(conf.ttl_d1, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45)));
}
#[test]
fn test_duration_parse() {
#[derive(Serialize, Deserialize)]
struct A {
#[serde(with = "humantime_serde")]
dur: Duration,
}
let a = A {
dur: Duration::from_millis(12000),
};
let s = serde_json::to_string(&a).unwrap();
assert_eq!(s, r#"{"dur":"12s"}"#);
let a = A {
dur: Duration::from_millis(12012),
};
let s = serde_json::to_string(&a).unwrap();
assert_eq!(s, r#"{"dur":"12s 12ms"}"#);
let a: A = serde_json::from_str(r#"{"dur":"3s170ms"}"#).unwrap();
assert_eq!(a.dur, Duration::from_millis(3170));
}
pub async fn parse_config(config: PathBuf) -> Result<(CaIngestOpts, Vec<String>), Error> {
let mut file = OpenOptions::new().read(true).open(config).await?;
let mut buf = Vec::new();
file.read_to_end(&mut buf).await?;
let conf: CaIngestOpts = serde_yaml::from_slice(&buf).map_err(|e| Error::with_msg_no_trace(format!("{:?}", e)))?;
drop(file);
let re_p = regex::Regex::new(&conf.whitelist.clone().unwrap_or("--nothing-whitelisted--".into()))?;
let re_n = regex::Regex::new(&conf.blacklist.clone().unwrap_or("--nothing-blacklisted--".into()))?;
let mut file = OpenOptions::new().read(true).open(&conf.channels).await?;
let mut buf = Vec::new();
file.read_to_end(&mut buf).await?;
let lines = buf.split(|&x| x == 0x0a);
let mut channels = Vec::new();
let mut i = 0;
for line in lines {
let line = String::from_utf8_lossy(line);
if i < 50 {
eprintln!("line has {}", line.len());
eprintln!("parse line {line}");
}
i += 1;
let use_line = if let Some(_cs) = re_p.captures(&line) {
true
} else if re_n.is_match(&line) {
false
} else {
true
};
if use_line {
channels.push(line.into());
}
}
info!("Parsed {} channels", channels.len());
Ok((conf, channels))
}

View File

@@ -1,5 +1,6 @@
use crate::ca::store::DataStore;
use crate::ca::{CaConnectOpts, IngestCommons};
use crate::ca::IngestCommons;
use crate::conf::CaIngestOpts;
use crate::rt::JoinHandle;
use crate::store::{CommonInsertItemQueue, IntoSimplerError, QueryItem};
use err::Error;
@@ -52,7 +53,7 @@ pub async fn spawn_scylla_insert_workers(
pg_client: Arc<PgClient>,
store_stats: Arc<stats::CaConnStats>,
use_rate_limit_queue: bool,
opts: CaConnectOpts,
opts: CaIngestOpts,
) -> Result<Vec<JoinHandle<()>>, Error> {
let (q2_tx, q2_rx) = async_channel::bounded(insert_item_queue.receiver().capacity().unwrap_or(20000));
{
@@ -124,9 +125,9 @@ pub async fn spawn_scylla_insert_workers(
insert_item_queue.receiver()
};
let ingest_commons = ingest_commons.clone();
let ttl_msp = opts.ttl_index;
let ttl_0d = opts.ttl_d0;
let ttl_1d = opts.ttl_d1;
let ttl_msp = opts.ttl_index();
let ttl_0d = opts.ttl_d0();
let ttl_1d = opts.ttl_d1();
let fut = async move {
let backoff_0 = Duration::from_millis(10);
let mut backoff = backoff_0.clone();
@@ -140,7 +141,7 @@ pub async fn spawn_scylla_insert_workers(
};
match item {
QueryItem::ConnectionStatus(item) => {
match crate::store::insert_connection_status(item, &data_store, &stats).await {
match crate::store::insert_connection_status(item, ttl_msp, &data_store, &stats).await {
Ok(_) => {
stats.connection_status_insert_done_inc();
backoff = backoff_0;
@@ -152,7 +153,7 @@ pub async fn spawn_scylla_insert_workers(
}
}
QueryItem::ChannelStatus(item) => {
match crate::store::insert_channel_status(item, &data_store, &stats).await {
match crate::store::insert_channel_status(item, ttl_msp, &data_store, &stats).await {
Ok(_) => {
stats.channel_status_insert_done_inc();
backoff = backoff_0;
@@ -188,6 +189,7 @@ pub async fn spawn_scylla_insert_workers(
item.ts as i64,
item.ema,
item.emd,
ttl_msp.as_secs() as i32,
);
let qres = data_store.scy.execute(&data_store.qu_insert_muted, values).await;
match qres {
@@ -209,6 +211,7 @@ pub async fn spawn_scylla_insert_workers(
item.ts as i64,
item.ema,
item.emd,
ttl_msp.as_secs() as i32,
);
let qres = data_store
.scy
@@ -234,6 +237,7 @@ pub async fn spawn_scylla_insert_workers(
item.ivl,
item.interest,
item.evsize as i32,
ttl_msp.as_secs() as i32,
);
let qres = data_store.scy.execute(&data_store.qu_insert_channel_ping, params).await;
match qres {

View File

@@ -1,9 +1,11 @@
use crate::ca::conn::ConnCommand;
use crate::ca::{ExtraInsertsConf, IngestCommons, METRICS};
use crate::ca::IngestCommons;
use crate::ca::METRICS;
use axum::extract::Query;
use err::Error;
use http::request::Parts;
use log::*;
use serde::{Deserialize, Serialize};
use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff};
use std::collections::HashMap;
use std::net::{SocketAddr, SocketAddrV4};
@@ -11,6 +13,17 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExtraInsertsConf {
pub copies: Vec<(u64, u64)>,
}
impl ExtraInsertsConf {
pub fn new() -> Self {
Self { copies: Vec::new() }
}
}
async fn find_channel(
params: HashMap<String, String>,
ingest_commons: Arc<IngestCommons>,

View File

@@ -1,6 +1,7 @@
pub mod bsread;
pub mod ca;
pub mod channelwriter;
pub mod conf;
pub mod errconv;
pub mod insertworker;
pub mod linuxhelper;

View File

@@ -331,6 +331,7 @@ where
par.ts_lsp as i64,
par.pulse as i64,
val,
par.ttl as i32,
);
data_store.scy.execute(qu, params).await?;
Ok(())
@@ -356,6 +357,7 @@ pub async fn insert_item(
if item.shape.to_scylla_vec().is_empty() { 0 } else { 1 } as i32,
item.scalar_type.to_scylla_i32(),
item.series.id() as i64,
ttl_index.as_secs() as i32,
);
data_store
.scy
@@ -408,6 +410,7 @@ pub async fn insert_item(
pub async fn insert_connection_status(
item: ConnectionStatusItem,
ttl: Duration,
data_store: &DataStore,
_stats: &CaConnStats,
) -> Result<(), Error> {
@@ -419,7 +422,7 @@ pub async fn insert_connection_status(
let ts_lsp = ts - ts_msp;
let kind = item.status as u32;
let addr = format!("{}", item.addr);
let params = (ts_msp as i64, ts_lsp as i64, kind as i32, addr);
let params = (ts_msp as i64, ts_lsp as i64, kind as i32, addr, ttl.as_secs() as i32);
data_store
.scy
.execute(&data_store.qu_insert_connection_status, params)
@@ -429,6 +432,7 @@ pub async fn insert_connection_status(
pub async fn insert_channel_status(
item: ChannelStatusItem,
ttl: Duration,
data_store: &DataStore,
_stats: &CaConnStats,
) -> Result<(), Error> {
@@ -440,12 +444,24 @@ pub async fn insert_channel_status(
let ts_lsp = ts - ts_msp;
let kind = item.status.kind();
let series = item.series.id();
let params = (series as i64, ts_msp as i64, ts_lsp as i64, kind as i32);
let params = (
series as i64,
ts_msp as i64,
ts_lsp as i64,
kind as i32,
ttl.as_secs() as i32,
);
data_store
.scy
.execute(&data_store.qu_insert_channel_status, params)
.await?;
let params = (ts_msp as i64, ts_lsp as i64, series as i64, kind as i32);
let params = (
ts_msp as i64,
ts_lsp as i64,
series as i64,
kind as i32,
ttl.as_secs() as i32,
);
data_store
.scy
.execute(&data_store.qu_insert_channel_status_by_ts_msp, params)

View File

@@ -7,8 +7,7 @@ use async_channel::{Receiver, Sender};
#[allow(unused)]
use bytes::BufMut;
use err::Error;
use futures_core::{Future, Stream};
use futures_util::{pin_mut, FutureExt, StreamExt};
use futures_util::{pin_mut, Future, FutureExt, Stream, StreamExt};
use log::*;
use netpod::timeunits::*;
use scylla::batch::{Batch, BatchType, Consistency};