Use ingest time as primary time and ioc time as alternative

This commit is contained in:
Dominik Werder
2024-05-14 15:17:32 +02:00
parent 0477504628
commit aa71f89f3c
11 changed files with 191 additions and 100 deletions

View File

@@ -89,7 +89,7 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
netfetch::ca::search::ca_search(conf, &channels).await?
}
ChannelAccess::CaIngest(k) => {
info!("daqingest version {} +0001", clap::crate_version!());
info!("daqingest version {} +0003", clap::crate_version!());
let (conf, channels_config) = parse_config(k.config.into()).await?;
daqingest::daemon::run(conf, channels_config).await?
}

View File

@@ -657,6 +657,29 @@ impl Daemon {
}
}
}
debug!("wait for metrics handler");
self.metrics_shutdown_tx.send(1).await?;
if let Some(jh) = self.metrics_jh.take() {
jh.await??;
}
debug!("joined metrics handler");
debug!("wait for postingest task");
match worker_jh.await? {
Ok(_) => {}
Err(e) => match e {
netfetch::metrics::postingest::Error::Msg => {
error!("{e}");
}
netfetch::metrics::postingest::Error::SeriesWriter(_) => {
error!("{e}");
}
netfetch::metrics::postingest::Error::SendError => {
error!("join postingest in better way");
}
},
}
debug!("joined postingest task");
debug!("wait for insert workers");
while let Some(jh) = self.insert_workers_jh.pop() {
match jh.await.map_err(Error::from_string) {
Ok(x) => match x {
@@ -675,15 +698,7 @@ impl Daemon {
}
}
}
debug!("wait for metrics handler");
self.metrics_shutdown_tx.send(1).await?;
if let Some(jh) = self.metrics_jh.take() {
jh.await??;
}
debug!("joined metrics handler");
debug!("wait for postingest task");
worker_jh.await?.map_err(|e| Error::from_string(e))?;
debug!("joined postingest task");
debug!("joined insert workers");
Ok(())
}
}

View File

@@ -76,7 +76,6 @@ const IOC_PING_IVL: Duration = Duration::from_millis(1000 * 80);
const DO_RATE_CHECK: bool = false;
const MONITOR_POLL_TIMEOUT: Duration = Duration::from_millis(6000);
const TIMEOUT_CHANNEL_CLOSING: Duration = Duration::from_millis(8000);
const TIMEOUT_MONITOR_PASSIVE: Duration = Duration::from_millis(1000 * 68);
const TIMEOUT_PONG_WAIT: Duration = Duration::from_millis(10000);
#[allow(unused)]
@@ -115,6 +114,15 @@ macro_rules! trace_flush_queue {
};
}
#[allow(unused)]
macro_rules! trace_event_incoming {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
#[derive(Debug, ThisError)]
pub enum Error {
NoProtocol,
@@ -1591,7 +1599,13 @@ impl CaConn {
stnow: SystemTime,
stats: &CaConnStats,
) -> Result<(), Error> {
// debug!("event_add_ingest payload_len {} value {:?}", payload_len, value);
trace_event_incoming!(
"event_add_ingest payload_len {} value {:?} {} {}",
payload_len,
value,
value.status,
value.severity
);
crst.ts_alive_last = tsnow;
crst.ts_activity_last = tsnow;
crst.item_recv_ivl_ema.tick(tsnow);
@@ -1778,8 +1792,8 @@ impl CaConn {
ReadingState::EnableMonitoring(_) => {}
ReadingState::Monitoring(st3) => match &st3.mon2state {
Monitoring2State::Passive(st4) => {
if st4.tsbeg + TIMEOUT_MONITOR_PASSIVE < tsnow {
trace2!("check_channels_state_poll Monitoring2State::Passive timeout");
if st4.tsbeg + conf.conf.manual_poll_on_quiet_after() < tsnow {
debug!("check_channels_state_poll Monitoring2State::Passive timeout");
// TODO encapsulate and unify with Polling handler
let ioid = Ioid(self.ioid);
self.ioid = self.ioid.wrapping_add(1);

View File

@@ -555,8 +555,8 @@ impl ChannelConfig {
/// Only used when in monitoring mode. If we do not see activity for this Duration then
/// we issue a manual read to see if the channel is alive.
pub fn manual_poll_on_quiet(&self) -> Duration {
Duration::from_secs(120)
pub fn manual_poll_on_quiet_after(&self) -> Duration {
Duration::from_secs(300)
}
pub fn expect_activity_within(&self) -> Duration {
@@ -564,22 +564,22 @@ impl ChannelConfig {
// It would be anyway invalid to be polled and specify a monitor record policy.
match self.arch.short_term {
Some(ChannelReadConfig::Poll(x)) => x,
Some(ChannelReadConfig::Monitor) => self.manual_poll_on_quiet(),
Some(ChannelReadConfig::Monitor) => self.manual_poll_on_quiet_after(),
None => match self.arch.medium_term {
Some(ChannelReadConfig::Poll(x)) => x,
Some(ChannelReadConfig::Monitor) => self.manual_poll_on_quiet(),
Some(ChannelReadConfig::Monitor) => self.manual_poll_on_quiet_after(),
None => match self.arch.long_term {
Some(ChannelReadConfig::Poll(x)) => x,
Some(ChannelReadConfig::Monitor) => self.manual_poll_on_quiet(),
Some(ChannelReadConfig::Monitor) => self.manual_poll_on_quiet_after(),
None => {
// This is an invalid configuration, so just a fallback
self.manual_poll_on_quiet()
self.manual_poll_on_quiet_after()
}
},
},
}
} else {
self.manual_poll_on_quiet()
self.manual_poll_on_quiet_after()
};
dur + Duration::from_millis(1000 * 10)
}

View File

@@ -225,9 +225,9 @@ async fn worker(
}
},
QueryItem::Insert(item) => {
let item_ts_local = item.ts_local;
let tsnow = TsMs::from_system_time(SystemTime::now());
let dt = tsnow.to_u64().saturating_sub(item_ts_local.to_u64()) as u32;
let item_ts_net = item.ts_net.clone();
let dt = tsnow.to_u64().saturating_sub(item_ts_net.to_u64()) as u32;
stats.item_lat_net_worker().ingest(dt);
let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire);
let do_insert = i1 % 1000 < insert_frac;
@@ -235,7 +235,7 @@ async fn worker(
Ok(_) => {
stats.inserted_values().inc();
let tsnow = TsMs::from_system_time(SystemTime::now());
let dt = tsnow.to_u64().saturating_sub(item_ts_local.to_u64()) as u32;
let dt = tsnow.to_u64().saturating_sub(item_ts_net.to_u64()) as u32;
stats.item_lat_net_store().ingest(dt);
backoff = backoff_0;
}
@@ -400,8 +400,8 @@ fn prepare_query_insert_futs(
tsnow: TsMs,
) -> SmallVec<[InsertFut; 4]> {
stats.inserts_value().inc();
let item_ts_local = item.ts_local;
let dt = tsnow.to_u64().saturating_sub(item_ts_local.to_u64()) as u32;
let item_ts_net = item.ts_net;
let dt = tsnow.to_u64().saturating_sub(item_ts_net.to_u64()) as u32;
stats.item_lat_net_worker().ingest(dt);
let msp_bump = item.msp_bump;
let series = item.series.clone();
@@ -416,7 +416,7 @@ fn prepare_query_insert_futs(
let fut = insert_msp_fut(
series,
ts_msp,
item_ts_local,
item_ts_net,
data_store.scy.clone(),
data_store.qu_insert_ts_msp.clone(),
stats.clone(),

View File

@@ -13,6 +13,7 @@ use netpod::DtNano;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsMs;
use netpod::TsNano;
use scylla::frame::value::Value;
use scylla::frame::value::ValueList;
use scylla::prepared_statement::PreparedStatement;
@@ -506,7 +507,8 @@ pub struct InsertItem {
pub scalar_type: ScalarType,
pub shape: Shape,
pub val: DataValue,
pub ts_local: TsMs,
pub ts_net: TsMs,
pub ts_alt_1: TsNano,
}
impl InsertItem {
@@ -556,7 +558,8 @@ struct InsParCom {
series: SeriesId,
ts_msp: TsMs,
ts_lsp: DtNano,
ts_local: TsMs,
ts_net: TsMs,
ts_alt_1: TsNano,
pulse: u64,
do_insert: bool,
stats: Arc<InsertWorkerStats>,
@@ -570,10 +573,11 @@ where
par.series.to_i64(),
par.ts_msp.to_i64(),
par.ts_lsp.to_i64(),
par.ts_alt_1.ns() as i64,
par.pulse as i64,
val,
);
InsertFut::new(scy, qu, params, par.ts_local, par.stats)
InsertFut::new(scy, qu, params, par.ts_net, par.stats)
}
// val: Vec<ST> where ST: Value + SerializeCql + Send + 'static,
@@ -582,10 +586,11 @@ fn insert_array_gen_fut(par: InsParCom, val: Vec<u8>, qu: Arc<PreparedStatement>
par.series.to_i64(),
par.ts_msp.to_i64(),
par.ts_lsp.to_i64(),
par.ts_alt_1.ns() as i64,
par.pulse as i64,
val,
);
InsertFut::new(scy, qu, params, par.ts_local, par.stats)
InsertFut::new(scy, qu, params, par.ts_net, par.stats)
}
#[pin_project::pin_project]
@@ -654,6 +659,7 @@ where
par.series.to_i64(),
par.ts_msp.to_i64(),
par.ts_lsp.to_i64(),
par.ts_alt_1.ns() as i64,
par.pulse as i64,
val,
);
@@ -690,6 +696,7 @@ where
par.series.to_i64(),
par.ts_msp.to_i64(),
par.ts_lsp.to_i64(),
par.ts_alt_1.ns() as i64,
par.pulse as i64,
val,
);
@@ -730,7 +737,8 @@ pub async fn insert_item(
series: item.series,
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
ts_local: item.ts_local,
ts_net: item.ts_net,
ts_alt_1: item.ts_alt_1,
pulse: item.pulse,
do_insert,
stats: stats.clone(),
@@ -753,7 +761,8 @@ pub async fn insert_item(
series: item.series,
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
ts_local: item.ts_local,
ts_net: item.ts_net,
ts_alt_1: item.ts_alt_1,
pulse: item.pulse,
do_insert,
stats: stats.clone(),
@@ -801,7 +810,8 @@ pub fn insert_item_fut(
series: item.series,
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
ts_local: item.ts_local,
ts_net: item.ts_net,
ts_alt_1: item.ts_alt_1,
pulse: item.pulse,
do_insert,
stats: stats.clone(),
@@ -824,7 +834,8 @@ pub fn insert_item_fut(
series: item.series,
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
ts_local: item.ts_local,
ts_net: item.ts_net,
ts_alt_1: item.ts_alt_1,
pulse: item.pulse,
do_insert,
stats: stats.clone(),

View File

@@ -397,6 +397,7 @@ async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySessio
("ts_lsp", "bigint"),
("pulse", "bigint"),
("value", cqlsty),
("ts_alt_1", "bigint"),
],
["series", "ts_msp"],
["ts_lsp"],
@@ -416,6 +417,7 @@ async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySessio
("pulse", "bigint"),
("value", &format!("frozen<list<{}>>", cqlsty)),
("valueblob", "blob"),
("ts_alt_1", "bigint"),
],
["series", "ts_msp"],
["ts_lsp"],

View File

@@ -47,8 +47,8 @@ macro_rules! prep_qu_ins_a {
($id1:expr, $rett:expr, $scy:expr) => {{
let cql = format!(
concat!(
"insert into {}{} (series, ts_msp, ts_lsp, pulse, value)",
" values (?, ?, ?, ?, ?)"
"insert into {}{} (series, ts_msp, ts_lsp, ts_alt_1, pulse, value)",
" values (?, ?, ?, ?, ?, ?)"
),
$rett.table_prefix(),
$id1
@@ -62,8 +62,8 @@ macro_rules! prep_qu_ins_b {
($id1:expr, $rett:expr, $scy:expr) => {{
let cql = format!(
concat!(
"insert into {}{} (series, ts_msp, ts_lsp, pulse, valueblob)",
" values (?, ?, ?, ?, ?)"
"insert into {}{} (series, ts_msp, ts_lsp, ts_alt_1, pulse, valueblob)",
" values (?, ?, ?, ?, ?, ?)"
),
$rett.table_prefix(),
$id1

View File

@@ -1,3 +1,4 @@
use core::fmt;
use serde::Deserialize;
use serde::Serialize;
@@ -34,6 +35,12 @@ impl SeriesId {
}
}
impl fmt::Display for SeriesId {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "SeriesId {{ {:20} }}", self.0)
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct ChannelStatusSeriesId(u64);

View File

@@ -3,17 +3,29 @@ use async_channel::Sender;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use netpod::log::*;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::QueryItem;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use std::collections::VecDeque;
use std::time::Duration;
use std::time::SystemTime;
#[allow(unused)]
macro_rules! trace_rt_decision {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
#[derive(Debug, ThisError)]
pub enum Error {
SeriesLookupError,
@@ -108,65 +120,85 @@ impl RtWriter {
pub fn write(
&mut self,
ts: TsNano,
ts_ioc: TsNano,
ts_local: TsNano,
val: DataValue,
iqdqs: &mut InsertDeques,
) -> Result<(), Error> {
let sid = self.sid;
Self::write_inner(
"ST",
self.min_quiets.st,
&mut self.state_st,
&mut iqdqs.st_rf3_rx,
ts_ioc,
ts_local,
val.clone(),
sid,
)?;
Self::write_inner(
"MT",
self.min_quiets.mt,
&mut self.state_mt,
&mut iqdqs.mt_rf3_rx,
ts_ioc,
ts_local,
val.clone(),
sid,
)?;
Self::write_inner(
"LT",
self.min_quiets.lt,
&mut self.state_lt,
&mut iqdqs.lt_rf3_rx,
ts_ioc,
ts_local,
val.clone(),
sid,
)?;
Ok(())
}
fn write_inner(
rt: &str,
min_quiet: Duration,
state: &mut State,
deque: &mut VecDeque<QueryItem>,
ts_ioc: TsNano,
ts_local: TsNano,
val: DataValue,
sid: SeriesId,
) -> Result<(), Error> {
// Decide whether we want to write.
{
let min_quiet = self.min_quiets.st;
let deque = &mut iqdqs.st_rf3_rx;
if self.state_st.last_ins.as_ref().map_or(true, |x| {
if x.0 >= ts_local {
// bad clock, ignore.
// TODO count in stats.
false
} else if ts_local.ms() - x.0.ms() < 1000 * min_quiet.as_secs() {
false
} else {
val != x.1
}
}) {
self.state_st.last_ins = Some((ts, val.clone()));
self.state_st.writer.write(ts, ts_local, val.clone(), deque)?;
}
}
{
let min_quiet = self.min_quiets.mt;
let deque = &mut iqdqs.mt_rf3_rx;
if self.state_mt.last_ins.as_ref().map_or(true, |x| {
if x.0 >= ts_local {
// bad clock, ignore.
// TODO count in stats.
false
} else if ts_local.ms() - x.0.ms() < 1000 * min_quiet.as_secs() {
false
} else {
val != x.1
}
}) {
self.state_mt.last_ins = Some((ts, val.clone()));
self.state_mt.writer.write(ts, ts_local, val.clone(), deque)?;
}
}
{
let min_quiet = self.min_quiets.lt;
let deque = &mut iqdqs.lt_rf3_rx;
if self.state_lt.last_ins.as_ref().map_or(true, |x| {
if x.0 >= ts_local {
// bad clock, ignore.
// TODO count in stats.
false
} else if ts_local.ms() - x.0.ms() < 1000 * min_quiet.as_secs() {
false
} else {
val != x.1
}
}) {
self.state_lt.last_ins = Some((ts, val.clone()));
self.state_lt.writer.write(ts, ts_local, val.clone(), deque)?;
// Use the IOC time for the decision whether to write.
// But use the ingest local time as the primary index.
let do_write = if let Some(last) = &state.last_ins {
if ts_ioc == last.ts_ioc {
trace_rt_decision!("{rt} {sid} ignore, because same IOC time {ts_ioc:?} {ts_local:?}");
false
} else if ts_local < last.ts_local {
trace_rt_decision!("{rt} {sid} ignore, because ts_local rewind {ts_ioc:?} {ts_local:?}");
false
} else if ts_local.ms() - last.ts_local.ms() < 1000 * min_quiet.as_secs() {
trace_rt_decision!("{rt} {sid} ignore, because not min quiet");
false
} else if val == last.val {
trace_rt_decision!("{rt} {sid} ignore, because value did not change");
false
} else {
trace_rt_decision!("{rt} {sid} accept");
true
}
} else {
true
};
if do_write {
state.last_ins = Some(LastIns {
ts_local,
ts_ioc,
val: val.clone(),
});
state.writer.write(ts_ioc, ts_local, val.clone(), deque)?;
}
Ok(())
}
@@ -179,8 +211,15 @@ impl RtWriter {
}
}
#[derive(Debug)]
struct LastIns {
ts_local: TsNano,
ts_ioc: TsNano,
val: DataValue,
}
#[derive(Debug)]
struct State {
writer: SeriesWriter,
last_ins: Option<(TsNano, DataValue)>,
last_ins: Option<LastIns>,
}

View File

@@ -149,14 +149,16 @@ impl SeriesWriter {
pub fn write(
&mut self,
ts: TsNano,
ts_ioc: TsNano,
ts_local: TsNano,
val: DataValue,
deque: &mut VecDeque<QueryItem>,
) -> Result<(), Error> {
let ts_main = ts_local;
// TODO compute the binned data here as well and flush completed bins if needed.
if let Some(binner) = self.binner.as_mut() {
binner.push(ts.clone(), &val)?;
binner.push(ts_main.clone(), &val)?;
}
// TODO decide on better msp/lsp: random offset!
@@ -169,9 +171,9 @@ impl SeriesWriter {
Some(ts_msp_last) => {
if self.inserted_in_current_msp >= self.msp_max_entries
|| self.bytes_in_current_msp >= self.msp_max_bytes
|| ts_msp_last.add_ns(HOUR) <= ts
|| ts_msp_last.add_ns(HOUR) <= ts_main
{
let ts_msp = ts.div(msp_res_max).mul(msp_res_max);
let ts_msp = ts_main.div(msp_res_max).mul(msp_res_max);
if ts_msp == ts_msp_last {
(ts_msp, false)
} else {
@@ -187,24 +189,25 @@ impl SeriesWriter {
}
}
None => {
let ts_msp = ts.div(msp_res_max).mul(msp_res_max);
let ts_msp = ts_main.div(msp_res_max).mul(msp_res_max);
self.ts_msp_last = Some(ts_msp);
self.inserted_in_current_msp = 1;
self.bytes_in_current_msp = val.byte_size();
(ts_msp, true)
}
};
let ts_lsp = ts.delta(ts_msp);
let ts_lsp = ts_main.delta(ts_msp);
let item = InsertItem {
series: self.sid.clone(),
ts_msp: ts_msp.to_ts_ms(),
ts_lsp,
ts_net: ts_local.to_ts_ms(),
ts_alt_1: ts_ioc,
msp_bump: ts_msp_changed,
pulse: 0,
scalar_type: self.scalar_type.clone(),
shape: self.shape.clone(),
val,
ts_local: ts_local.to_ts_ms(),
};
// TODO decide on the path in the new deques struct
deque.push_back(QueryItem::Insert(item));