Take ttl and backend from parameters

This commit is contained in:
Dominik Werder
2022-12-12 17:24:38 +01:00
parent b3bd344f5d
commit b5c1039c78
14 changed files with 135 additions and 2606 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -31,6 +31,8 @@ pub enum SubCmd {
#[derive(Debug, Parser)]
pub struct Bsread {
#[arg(long)]
pub backend: String,
#[arg(long)]
pub scylla: Vec<String>,
#[arg(long)]
@@ -50,6 +52,7 @@ pub struct Bsread {
impl From<Bsread> for ZmtpClientOpts {
fn from(k: Bsread) -> Self {
Self {
backend: k.backend,
scylla: k.scylla,
sources: k.source,
rcvbuf: k.rcvbuf,
@@ -67,6 +70,8 @@ pub struct FetchEvents {
pub scylla: Vec<String>,
#[arg(long)]
pub channel: String,
#[arg(long)]
pub backend: String,
}
#[derive(Debug, Parser)]

View File

@@ -119,7 +119,7 @@ pub async fn fetch_events(opts: FetchEvents) -> Result<(), Error> {
"select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?",
)
.await?;
let qres = scy.execute(&qu_series, ("scylla", &opts.channel)).await?;
let qres = scy.execute(&qu_series, (&opts.backend, &opts.channel)).await?;
if let Some(rows) = qres.rows {
info!("Found {} matching series", rows.len());
for r in &rows {

View File

@@ -12,7 +12,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_cbor = "0.11"
serde_yaml = "0.8.23"
tokio = { version = "1.18.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs", "tracing"] }
tokio = { version = "1.23.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs", "tracing"] }
tokio-stream = { version = "0.1", features = ["fs"]}
async-channel = "1.6"
bytes = "1.0"
@@ -32,6 +32,8 @@ http = "0.2"
url = "2.2"
hyper = "0.14"
chrono = "0.4"
humantime = "2.1"
humantime-serde = "1.1"
log = { path = "../log" }
stats = { path = "../stats" }
err = { path = "../../daqbuffer/err" }

View File

@@ -51,7 +51,8 @@ struct ChannelConfig {
whitelist: Option<String>,
blacklist: Option<String>,
max_simul: Option<usize>,
timeout: Option<u64>,
#[serde(with = "humantime_serde")]
timeout: Option<Duration>,
postgresql: Database,
scylla: ScyllaConfig,
array_truncate: Option<usize>,
@@ -64,31 +65,38 @@ struct ChannelConfig {
store_workers_rate: Option<u64>,
insert_frac: Option<u64>,
use_rate_limit_queue: Option<bool>,
#[serde(with = "humantime_serde")]
ttl_index: Option<Duration>,
#[serde(with = "humantime_serde")]
ttl_d0: Option<Duration>,
#[serde(with = "humantime_serde")]
ttl_d1: Option<Duration>,
}
#[test]
fn parse_config_minimal() {
let conf = r###"
backend: scylla
api_bind: 0.0.0.0:3011
ttl_d1: 10m 3s
api_bind: "0.0.0.0:3011"
channels:
- CHANNEL-1:A
- CHANNEL-1:B
- CHANNEL-2:A
- CHANNEL-1:A
- CHANNEL-1:B
- CHANNEL-2:A
search:
- 172.26.0.255
- 172.26.2.255
- 172.26.0.255
- 172.26.2.255
postgresql:
host: host.example.com
port: 5432
user: USER
pass: PASS
name: NAME
host: host.example.com
port: 5432
user: USER
pass: PASS
name: NAME
scylla:
hosts:
- sf-nube-11:19042
- sf-nube-12:19042
keyspace: ks1
hosts:
- sf-nube-11:19042
- sf-nube-12:19042
keyspace: ks1
"###;
let res: Result<ChannelConfig, _> = serde_yaml::from_slice(conf.as_bytes());
assert_eq!(res.is_ok(), true);
@@ -96,6 +104,28 @@ scylla:
assert_eq!(conf.api_bind, Some("0.0.0.0:3011".to_string()));
assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string()));
assert_eq!(conf.scylla.hosts.get(1), Some(&"sf-nube-12:19042".to_string()));
assert_eq!(conf.ttl_d1, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45)));
}
#[test]
fn test_duration_parse() {
#[derive(Serialize, Deserialize)]
struct A {
#[serde(with = "humantime_serde")]
dur: Duration,
}
let a = A {
dur: Duration::from_millis(12000),
};
let s = serde_json::to_string(&a).unwrap();
assert_eq!(s, r#"{"dur":"12s"}"#);
let a = A {
dur: Duration::from_millis(12012),
};
let s = serde_json::to_string(&a).unwrap();
assert_eq!(s, r#"{"dur":"12s 12ms"}"#);
let a: A = serde_json::from_str(r#"{"dur":"3s170ms"}"#).unwrap();
assert_eq!(a.dur, Duration::from_millis(3170));
}
pub struct ListenFromFileOpts {
@@ -130,7 +160,7 @@ pub async fn parse_config(config: PathBuf) -> Result<CaConnectOpts, Error> {
search_blacklist: conf.search_blacklist,
addr_bind: conf.addr_bind.unwrap_or(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))),
addr_conn: conf.addr_conn.unwrap_or(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255))),
timeout: conf.timeout.unwrap_or(1200),
timeout: conf.timeout.unwrap_or(Duration::from_millis(1200)),
pgconf: conf.postgresql,
scyconf: conf.scylla,
array_truncate: conf.array_truncate.unwrap_or(512),
@@ -143,9 +173,14 @@ pub async fn parse_config(config: PathBuf) -> Result<CaConnectOpts, Error> {
store_workers_rate: conf.store_workers_rate.unwrap_or(10000),
insert_frac: conf.insert_frac.unwrap_or(1000),
use_rate_limit_queue: conf.use_rate_limit_queue.unwrap_or(false),
ttl_index: conf.ttl_index.unwrap_or(Duration::from_secs(60 * 60 * 24 * 3)),
ttl_d0: conf.ttl_d0.unwrap_or(Duration::from_secs(60 * 60 * 24 * 1)),
ttl_d1: conf.ttl_d1.unwrap_or(Duration::from_secs(60 * 60 * 12)),
})
}
// TODO (low-prio) could remove usage of clone to avoid clone of large channel list.
#[derive(Clone)]
pub struct CaConnectOpts {
pub backend: String,
pub channels: Vec<String>,
@@ -153,7 +188,7 @@ pub struct CaConnectOpts {
pub search_blacklist: Vec<String>,
pub addr_bind: IpAddr,
pub addr_conn: IpAddr,
pub timeout: u64,
pub timeout: Duration,
pub pgconf: Database,
pub scyconf: ScyllaConfig,
pub array_truncate: usize,
@@ -166,6 +201,9 @@ pub struct CaConnectOpts {
pub store_workers_rate: u64,
pub insert_frac: u64,
pub use_rate_limit_queue: bool,
pub ttl_index: Duration,
pub ttl_d0: Duration,
pub ttl_d1: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -181,6 +219,7 @@ impl ExtraInsertsConf {
pub struct IngestCommons {
pub pgconf: Arc<Database>,
pub backend: String,
pub local_epics_hostname: String,
pub insert_item_queue: Arc<CommonInsertItemQueue>,
pub data_store: Arc<DataStore>,
@@ -288,22 +327,14 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
let insert_ivl_min = Arc::new(AtomicU64::new(8800));
let opts = parse_config(opts.config).await?;
let scyconf = opts.scyconf.clone();
let pgconf = Database {
name: opts.pgconf.name.clone(),
host: opts.pgconf.host.clone(),
port: opts.pgconf.port.clone(),
user: opts.pgconf.user.clone(),
pass: opts.pgconf.pass.clone(),
};
let pgconf = opts.pgconf.clone();
let d = &pgconf;
let (pg_client, pg_conn) = tokio_postgres::connect(
&format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name),
tokio_postgres::tls::NoTls,
)
.await
.unwrap();
.err_conv()?;
// TODO allow clean shutdown on ctrl-c and join the pg_conn in the end:
tokio::spawn(pg_conn);
let pg_client = Arc::new(pg_client);
@@ -339,6 +370,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
let ingest_commons = IngestCommons {
pgconf: Arc::new(pgconf.clone()),
backend: opts.backend.clone(),
local_epics_hostname: opts.local_epics_hostname.clone(),
insert_item_queue: insert_item_queue.clone(),
data_store: data_store.clone(),
@@ -361,6 +393,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
pg_client.clone(),
store_stats.clone(),
opts.use_rate_limit_queue,
opts.clone(),
)
.await?;
@@ -390,7 +423,12 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
}
ingest_commons
.ca_conn_set
.add_channel_to_addr(SocketAddr::V4(addr.clone()), ch.clone(), ingest_commons.clone())
.add_channel_to_addr(
opts.backend.clone(),
SocketAddr::V4(addr.clone()),
ch.clone(),
ingest_commons.clone(),
)
.await?;
}
ix += 1;

View File

@@ -369,6 +369,7 @@ pub struct CaConn {
insert_item_send_fut: Option<async_channel::Send<'static, QueryItem>>,
fut_get_series:
FuturesOrdered<Pin<Box<dyn Future<Output = Result<(Cid, u32, u16, u16, Existence<SeriesId>), Error>> + Send>>>,
backend: String,
remote_addr_dbg: SocketAddrV4,
local_epics_hostname: String,
array_truncate: usize,
@@ -387,6 +388,7 @@ pub struct CaConn {
impl CaConn {
pub fn new(
backend: String,
remote_addr_dbg: SocketAddrV4,
local_epics_hostname: String,
data_store: Arc<DataStore>,
@@ -412,6 +414,7 @@ impl CaConn {
insert_item_sender,
insert_item_send_fut: None,
fut_get_series: FuturesOrdered::new(),
backend,
remote_addr_dbg,
local_epics_hostname,
array_truncate,
@@ -730,6 +733,8 @@ impl CaConn {
// TODO need last-save-ts for this state.
}
ChannelState::Created(st) => {
// TODO if we don't wave a series id yet, dont' save? write-ampl.
let msp = info_store_msp_from_time(timenow.clone());
if msp != st.info_store_msp_last {
st.info_store_msp_last = msp;
@@ -1243,7 +1248,7 @@ impl CaConn {
&*(&self.data_store.chan_reg as &ChannelRegistry as *const ChannelRegistry)
};
let fut = z
.get_series_id(cd)
.get_series_id(cd, self.backend.clone())
.map_ok(move |series| (cid, k.sid, k.data_type, k.data_count, series));
// TODO throttle execution rate:
self.fut_get_series.push_back(Box::pin(fut) as _);

View File

@@ -82,6 +82,7 @@ impl CaConnSet {
pub async fn create_ca_conn(
&self,
backend: String,
addr: SocketAddrV4,
local_epics_hostname: String,
array_truncate: usize,
@@ -94,6 +95,7 @@ impl CaConnSet {
info!("create new CaConn {:?}", addr);
let addr2 = SocketAddr::V4(addr.clone());
let mut conn = CaConn::new(
backend.clone(),
addr,
local_epics_hostname,
data_store.clone(),
@@ -236,6 +238,7 @@ impl CaConnSet {
pub async fn add_channel_to_addr(
&self,
backend: String,
addr: SocketAddr,
channel_name: String,
ingest_commons: Arc<IngestCommons>,
@@ -263,6 +266,7 @@ impl CaConnSet {
};
// TODO use parameters:
self.create_ca_conn(
backend.clone(),
addr,
ingest_commons.local_epics_hostname.clone(),
512,

View File

@@ -47,7 +47,6 @@ async fn resolve_address(addr_str: &str) -> Result<SocketAddr, Error> {
pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
info!("ca_search begin");
let facility = "scylla";
let opts = parse_config(opts.config).await?;
let d = Database {
name: opts.pgconf.name.clone(),
@@ -192,7 +191,7 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
pg_client
.execute(
&qu_insert,
&[&facility, &item.channel, &queryaddr, &responseaddr, &addr],
&[&opts.backend, &item.channel, &queryaddr, &responseaddr, &addr],
)
.await
.unwrap();

View File

@@ -35,8 +35,8 @@ impl ChannelRegistry {
Self { pg_client }
}
pub async fn get_series_id(&self, cd: ChannelDescDecoded) -> Result<Existence<SeriesId>, Error> {
crate::series::get_series_id(&self.pg_client, &cd).await
pub async fn get_series_id(&self, cd: ChannelDescDecoded, backend: String) -> Result<Existence<SeriesId>, Error> {
crate::series::get_series_id(&self.pg_client, &cd, backend).await
}
}

View File

@@ -1,5 +1,5 @@
use crate::ca::store::DataStore;
use crate::ca::IngestCommons;
use crate::ca::{CaConnectOpts, IngestCommons};
use crate::rt::JoinHandle;
use crate::store::{CommonInsertItemQueue, IntoSimplerError, QueryItem};
use err::Error;
@@ -52,6 +52,7 @@ pub async fn spawn_scylla_insert_workers(
pg_client: Arc<PgClient>,
store_stats: Arc<stats::CaConnStats>,
use_rate_limit_queue: bool,
opts: CaConnectOpts,
) -> Result<Vec<JoinHandle<()>>, Error> {
let (q2_tx, q2_rx) = async_channel::bounded(insert_item_queue.receiver().capacity().unwrap_or(20000));
{
@@ -123,9 +124,9 @@ pub async fn spawn_scylla_insert_workers(
insert_item_queue.receiver()
};
let ingest_commons = ingest_commons.clone();
let ttl_msp = 60 * 60 * 24 * 4;
let ttl_0d = 60 * 60 * 24 * 2;
let ttl_1d = 60 * 60 * 12;
let ttl_msp = opts.ttl_index;
let ttl_0d = opts.ttl_d0;
let ttl_1d = opts.ttl_d1;
let fut = async move {
let backoff_0 = Duration::from_millis(10);
let mut backoff = backoff_0.clone();

View File

@@ -32,7 +32,12 @@ async fn channel_add_inner(params: HashMap<String, String>, ingest_commons: Arc<
Ok(Some(addr)) => {
ingest_commons
.ca_conn_set
.add_channel_to_addr(SocketAddr::V4(addr), name.into(), ingest_commons.clone())
.add_channel_to_addr(
ingest_commons.backend.clone(),
SocketAddr::V4(addr),
name.into(),
ingest_commons.clone(),
)
.await?;
Ok(())
}

View File

@@ -36,15 +36,18 @@ impl SeriesId {
}
// TODO don't need byte_order or compression from ChannelDescDecoded for channel registration.
pub async fn get_series_id(pg_client: &PgClient, cd: &ChannelDescDecoded) -> Result<Existence<SeriesId>, Error> {
let facility = "scylla";
pub async fn get_series_id(
pg_client: &PgClient,
cd: &ChannelDescDecoded,
backend: String,
) -> Result<Existence<SeriesId>, Error> {
let channel_name = &cd.name;
let scalar_type = cd.scalar_type.to_scylla_i32();
let shape = cd.shape.to_scylla_vec();
let res = pg_client
.query(
"select series from series_by_channel where facility = $1 and channel = $2 and scalar_type = $3 and shape_dims = $4 and agg_kind = 0",
&[&facility, channel_name, &scalar_type, &shape],
&[&backend, channel_name, &scalar_type, &shape],
)
.await
.err_conv()?;
@@ -59,7 +62,7 @@ pub async fn get_series_id(pg_client: &PgClient, cd: &ChannelDescDecoded) -> Res
if rn == 0 {
use md5::Digest;
let mut h = md5::Md5::new();
h.update(facility.as_bytes());
h.update(backend.as_bytes());
h.update(channel_name.as_bytes());
h.update(format!("{:?}", scalar_type).as_bytes());
h.update(format!("{:?}", shape).as_bytes());
@@ -85,7 +88,7 @@ pub async fn get_series_id(pg_client: &PgClient, cd: &ChannelDescDecoded) -> Res
" (series, facility, channel, scalar_type, shape_dims, agg_kind)",
" values ($1, $2, $3, $4, $5, 0) on conflict do nothing"
),
&[&(series as i64), &facility, channel_name, &scalar_type, &shape],
&[&(series as i64), &backend, channel_name, &scalar_type, &shape],
)
.await
.unwrap();
@@ -94,17 +97,17 @@ pub async fn get_series_id(pg_client: &PgClient, cd: &ChannelDescDecoded) -> Res
return Ok(series);
} else {
warn!(
"tried to insert {series:?} for {facility} {channel_name} {scalar_type:?} {shape:?} trying again..."
"tried to insert {series:?} for {backend:?} {channel_name:?} {scalar_type:?} {shape:?} trying again..."
);
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
error!("tried to insert new series id for {facility} {channel_name} {scalar_type:?} {shape:?} but failed");
Err(Error::with_msg_no_trace(format!("get_series_id can not create and insert series id {facility:?} {channel_name:?} {scalar_type:?} {shape:?}")))
error!("tried to insert new series id for {backend:?} {channel_name:?} {scalar_type:?} {shape:?} but failed");
Err(Error::with_msg_no_trace(format!("get_series_id can not create and insert series id {backend:?} {channel_name:?} {scalar_type:?} {shape:?}")))
} else {
let series = all[0] as u64;
let series = Existence::Existing(SeriesId(series));
debug!("get_series_id {facility:?} {channel_name:?} {scalar_type:?} {shape:?} {series:?}");
debug!("get_series_id {backend:?} {channel_name:?} {scalar_type:?} {shape:?} {series:?}");
Ok(series)
}
}

View File

@@ -13,7 +13,7 @@ use stats::CaConnStats;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Instant, SystemTime};
use std::time::{Duration, Instant, SystemTime};
use tokio::sync::Mutex as TokMx;
pub const CONNECTION_STATUS_DIV: u64 = netpod::timeunits::DAY;
@@ -338,14 +338,14 @@ where
pub async fn insert_item(
item: InsertItem,
ttl_msp: u32,
ttl_0d: u32,
ttl_1d: u32,
ttl_index: Duration,
ttl_0d: Duration,
ttl_1d: Duration,
data_store: &DataStore,
stats: &CaConnStats,
) -> Result<(), Error> {
if item.msp_bump {
let params = (item.series.id() as i64, item.ts_msp as i64, ttl_msp as i32);
let params = (item.series.id() as i64, item.ts_msp as i64, ttl_index.as_secs() as i32);
data_store.scy.execute(&data_store.qu_insert_ts_msp, params).await?;
stats.inserts_msp_inc();
}
@@ -371,7 +371,7 @@ pub async fn insert_item(
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
pulse: item.pulse,
ttl: ttl_0d,
ttl: ttl_0d.as_secs() as _,
};
use CaDataScalarValue::*;
match val {
@@ -390,7 +390,7 @@ pub async fn insert_item(
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
pulse: item.pulse,
ttl: ttl_1d,
ttl: ttl_1d.as_secs() as _,
};
use CaDataArrayValue::*;
match val {

View File

@@ -94,6 +94,7 @@ pub struct CommonQueries {
#[derive(Clone)]
pub struct ZmtpClientOpts {
pub backend: String,
pub scylla: Vec<String>,
pub sources: Vec<String>,
pub do_pulse_id: bool,
@@ -375,7 +376,7 @@ impl BsreadClient {
self.scy
.query(
"insert into series_by_channel (facility, channel_name, series, scalar_type, shape_dims) values (?, ?, ?, ?, ?) if not exists",
("scylla", &cd.name, series as i64, cd.scalar_type.to_scylla_i32(), &shape_dims),
(&self.opts.backend, &cd.name, series as i64, cd.scalar_type.to_scylla_i32(), &shape_dims),
)
.await
.err_conv()?;