Check for repeated value already in RtWriter
This commit is contained in:
@@ -67,10 +67,7 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
|
||||
pass: k.pg_pass,
|
||||
name: k.pg_name,
|
||||
};
|
||||
let scyconf = ScyllaIngestConfig::new([k.scylla_host], k.scylla_keyspace, 3);
|
||||
// scywr::schema::migrate_scylla_data_schema(&scyconf, netpod::ttl::RetentionTime::Short)
|
||||
// .await
|
||||
// .map_err(Error::from_string)?;
|
||||
let scyconf = ScyllaIngestConfig::new([k.scylla_host], k.scylla_keyspace);
|
||||
match k.sub {
|
||||
DbSub::Data(u) => {
|
||||
use daqingest::opts::DbDataSub;
|
||||
|
||||
@@ -27,6 +27,7 @@ axum = "0.8.1"
|
||||
http-body = "1"
|
||||
url = "2.5"
|
||||
chrono = "0.4"
|
||||
time = { version = "0.3.40", features = ["serde"] }
|
||||
humantime = "2.1.0"
|
||||
humantime-serde = "1.1.1"
|
||||
pin-project = "1"
|
||||
|
||||
@@ -79,6 +79,7 @@ use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use std::time::SystemTime;
|
||||
use taskrun::tokio;
|
||||
use time::UtcDateTime;
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
const CONNECTING_TIMEOUT: Duration = Duration::from_millis(1000 * 6);
|
||||
@@ -188,6 +189,7 @@ pub struct ChannelStateInfo {
|
||||
pub write_mt_last: SystemTime,
|
||||
pub write_lt_last: SystemTime,
|
||||
pub status_emit_count: u64,
|
||||
pub last_comparisons: Option<VecDeque<(UtcDateTime, MonitorReadCmp)>>,
|
||||
}
|
||||
|
||||
mod ser_instant {
|
||||
@@ -307,12 +309,21 @@ enum Monitoring2State {
|
||||
Passive(Monitoring2PassiveState),
|
||||
ReadPending(Ioid, Instant),
|
||||
}
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
enum MonitorReadCmp {
|
||||
Equal,
|
||||
DiffTime,
|
||||
DiffTimeValue,
|
||||
DiffValue,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct MonitoringState {
|
||||
tsbeg: Instant,
|
||||
subid: Subid,
|
||||
mon2state: Monitoring2State,
|
||||
monitoring_event_last: Option<proto::EventAddRes>,
|
||||
last_comparisons: VecDeque<(time::UtcDateTime, MonitorReadCmp)>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -606,6 +617,13 @@ impl ChannelState {
|
||||
ChannelState::Writable(s) => s.channel.status_emit_count,
|
||||
_ => 0,
|
||||
};
|
||||
let last_comparisons = match self {
|
||||
ChannelState::Writable(s) => match &s.reading {
|
||||
ReadingState::Monitoring(st2) => Some(st2.last_comparisons.clone()),
|
||||
_ => None,
|
||||
},
|
||||
_ => None,
|
||||
};
|
||||
ChannelStateInfo {
|
||||
stnow,
|
||||
cssid,
|
||||
@@ -628,6 +646,7 @@ impl ChannelState {
|
||||
write_mt_last,
|
||||
write_lt_last,
|
||||
status_emit_count,
|
||||
last_comparisons,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1784,6 +1803,8 @@ impl CaConn {
|
||||
tsbeg: tsnow,
|
||||
ts_silence_read_next: tsnow + Self::silence_read_next_ivl_rng(&mut self.rng),
|
||||
}),
|
||||
monitoring_event_last: Some(ev.clone()),
|
||||
last_comparisons: VecDeque::new(),
|
||||
});
|
||||
let crst = &mut st.channel;
|
||||
let writer = &mut st.writer;
|
||||
@@ -1821,6 +1842,7 @@ impl CaConn {
|
||||
let binwriter = &mut st.binwriter;
|
||||
let iqdqs = &mut self.iqdqs;
|
||||
let stats = self.stats.as_ref();
|
||||
st2.monitoring_event_last = Some(ev.clone());
|
||||
Self::event_add_ingest(
|
||||
ev.payload_len,
|
||||
ev.value,
|
||||
@@ -2060,6 +2082,27 @@ impl CaConn {
|
||||
}
|
||||
let iqdqs = &mut self.iqdqs;
|
||||
let stats = self.stats.as_ref();
|
||||
// NOTE we do not update the last value in this ev handler.
|
||||
{
|
||||
if let Some(lst) = st2.monitoring_event_last.as_ref() {
|
||||
// TODO compare with last monitoring value
|
||||
if ev.value.data == lst.value.data {
|
||||
if ev.value.meta == lst.value.meta {
|
||||
st2.last_comparisons
|
||||
.push_back((UtcDateTime::now(), MonitorReadCmp::Equal));
|
||||
} else {
|
||||
st2.last_comparisons
|
||||
.push_back((UtcDateTime::now(), MonitorReadCmp::DiffTime));
|
||||
}
|
||||
} else {
|
||||
st2.last_comparisons
|
||||
.push_back((UtcDateTime::now(), MonitorReadCmp::DiffValue));
|
||||
}
|
||||
}
|
||||
while st2.last_comparisons.len() > 6 {
|
||||
st2.last_comparisons.pop_front();
|
||||
}
|
||||
}
|
||||
// TODO check ADEL to see if monitor should have fired.
|
||||
// But there is still a small chance that the monitor will just received slightly later.
|
||||
// More involved check would be to raise a flag, wait for the expected monitor for some
|
||||
|
||||
@@ -1055,6 +1055,10 @@ impl CaMsg {
|
||||
}
|
||||
// TODO make response type for host name:
|
||||
0x15 => CaMsg::from_ty_ts(CaMsgTy::HostName("TODOx5288".into()), tsnow),
|
||||
0x1b => {
|
||||
warn!("HANDLE_SERVER_CHANNEL_DISCONNECT");
|
||||
return Err(Error::CaCommandNotSupported(x));
|
||||
}
|
||||
x => return Err(Error::CaCommandNotSupported(x)),
|
||||
};
|
||||
Ok(msg)
|
||||
|
||||
@@ -4,11 +4,10 @@ use serde::Deserialize;
|
||||
pub struct ScyllaIngestConfig {
|
||||
hosts: Vec<String>,
|
||||
keyspace: String,
|
||||
rf: u8,
|
||||
}
|
||||
|
||||
impl ScyllaIngestConfig {
|
||||
pub fn new<I, H, K1>(hosts: I, ks: K1, rf: u8) -> Self
|
||||
pub fn new<I, H, K1>(hosts: I, ks: K1) -> Self
|
||||
where
|
||||
I: IntoIterator<Item = H>,
|
||||
H: Into<String>,
|
||||
@@ -17,7 +16,6 @@ impl ScyllaIngestConfig {
|
||||
Self {
|
||||
hosts: hosts.into_iter().map(Into::into).collect(),
|
||||
keyspace: ks.into(),
|
||||
rf,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,8 +26,4 @@ impl ScyllaIngestConfig {
|
||||
pub fn keyspace(&self) -> &String {
|
||||
&self.keyspace
|
||||
}
|
||||
|
||||
pub fn rf(&self) -> u8 {
|
||||
self.rf
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,7 +54,7 @@ impl Changeset {
|
||||
|
||||
fn log_statements(&self) {
|
||||
for q in &self.todo {
|
||||
info!("WOULD DO {q}");
|
||||
info!("would execute:\n{q}\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -70,10 +70,9 @@ pub async fn has_keyspace(name: &str, scy: &ScySession) -> Result<bool, Error> {
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
pub async fn has_table(name: &str, scy: &ScySession) -> Result<bool, Error> {
|
||||
pub async fn has_table(ks: &str, name: &str, scy: &ScySession) -> Result<bool, Error> {
|
||||
let cql = "select table_name from system_schema.tables where keyspace_name = ?";
|
||||
let ks = scy.get_keyspace().ok_or_else(|| Error::NoKeyspaceChosen)?;
|
||||
let mut res = scy.query_iter(cql, (ks.as_ref(),)).await?.rows_stream::<(String,)>()?;
|
||||
let mut res = scy.query_iter(cql, (ks,)).await?.rows_stream::<(String,)>()?;
|
||||
while let Some((table_name,)) = res.try_next().await? {
|
||||
if table_name == name {
|
||||
return Ok(true);
|
||||
@@ -82,9 +81,12 @@ pub async fn has_table(name: &str, scy: &ScySession) -> Result<bool, Error> {
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
pub async fn check_table_readable(name: &str, scy: &ScySession) -> Result<bool, Error> {
|
||||
pub async fn check_table_readable(ks: &str, name: &str, scy: &ScySession) -> Result<bool, Error> {
|
||||
use crate::scylla::transport::errors::QueryError;
|
||||
match scy.query_unpaged(format!("select * from {} limit 1", name), ()).await {
|
||||
match scy
|
||||
.query_unpaged(format!("select * from {}.{} limit 1", ks, name), ())
|
||||
.await
|
||||
{
|
||||
Ok(_) => Ok(true),
|
||||
Err(e) => match &e {
|
||||
QueryError::DbError(e2, msg) => match e2 {
|
||||
@@ -216,7 +218,7 @@ impl GenTwcsTab {
|
||||
}
|
||||
|
||||
async fn has_table_name(&self, scy: &ScySession) -> Result<bool, Error> {
|
||||
has_table(self.name(), scy).await
|
||||
has_table(self.keyspace(), self.name(), scy).await
|
||||
}
|
||||
|
||||
fn cql(&self) -> String {
|
||||
@@ -283,7 +285,6 @@ impl GenTwcsTab {
|
||||
);
|
||||
let x = scy.query_iter(cql, (self.keyspace(), self.name())).await?;
|
||||
let mut it = x.rows_stream::<(i32, i32, BTreeMap<String, String>)>()?;
|
||||
// let mut it = x.into_typed::<(i32, i32, BTreeMap<String, String>)>();
|
||||
let mut rows = Vec::new();
|
||||
while let Some(u) = it.next().await {
|
||||
let row = u?;
|
||||
@@ -478,7 +479,7 @@ async fn check_event_tables(
|
||||
],
|
||||
["series", "ts_msp"],
|
||||
["ts_lsp"],
|
||||
rett.ttl_events_d1(),
|
||||
rett.ttl_events_d0(),
|
||||
);
|
||||
tab.setup(chs, scy).await?;
|
||||
}
|
||||
@@ -495,7 +496,7 @@ async fn check_event_tables(
|
||||
],
|
||||
["series", "ts_msp"],
|
||||
["ts_lsp"],
|
||||
rett.ttl_events_d1(),
|
||||
rett.ttl_events_d0(),
|
||||
);
|
||||
tab.setup(chs, scy).await?;
|
||||
}
|
||||
@@ -512,7 +513,7 @@ async fn check_event_tables(
|
||||
],
|
||||
["series", "ts_msp"],
|
||||
["ts_lsp"],
|
||||
rett.ttl_events_d1(),
|
||||
rett.ttl_events_d0(),
|
||||
);
|
||||
tab.setup(chs, scy).await?;
|
||||
}
|
||||
@@ -522,6 +523,7 @@ async fn check_event_tables(
|
||||
async fn migrate_scylla_data_schema(
|
||||
scyconf: &ScyllaIngestConfig,
|
||||
rett: RetentionTime,
|
||||
rf: u8,
|
||||
chs: &mut Changeset,
|
||||
) -> Result<(), Error> {
|
||||
let scy2 = create_session_no_ks(scyconf).await?;
|
||||
@@ -530,14 +532,13 @@ async fn migrate_scylla_data_schema(
|
||||
let ks = scyconf.keyspace();
|
||||
|
||||
if !has_keyspace(ks, scy).await? {
|
||||
let replication = scyconf.rf();
|
||||
let cql = format!(
|
||||
concat!(
|
||||
"create keyspace {}",
|
||||
" with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }}",
|
||||
" and durable_writes = {};"
|
||||
),
|
||||
ks, replication, durable
|
||||
ks, rf, durable
|
||||
);
|
||||
info!("scylla create keyspace {cql}");
|
||||
chs.add_todo(cql);
|
||||
@@ -545,8 +546,6 @@ async fn migrate_scylla_data_schema(
|
||||
info!("scylla has keyspace {ks}");
|
||||
}
|
||||
|
||||
scy.use_keyspace(ks, true).await?;
|
||||
|
||||
check_event_tables(ks, rett.clone(), chs, scy).await?;
|
||||
|
||||
{
|
||||
@@ -692,13 +691,19 @@ async fn migrate_scylla_data_schema(
|
||||
}
|
||||
{
|
||||
let tn = format!("{}{}", rett.table_prefix(), "bin_write_index_v00");
|
||||
if has_table(&tn, scy).await? {
|
||||
if has_table(ks, &tn, scy).await? {
|
||||
chs.add_todo(format!("drop table {}.{}", ks, tn));
|
||||
}
|
||||
}
|
||||
{
|
||||
let tn = format!("{}{}", rett.table_prefix(), "bin_write_index_v01");
|
||||
if has_table(&tn, scy).await? {
|
||||
if has_table(&ks, &tn, scy).await? {
|
||||
chs.add_todo(format!("drop table {}.{}", ks, tn));
|
||||
}
|
||||
}
|
||||
{
|
||||
let tn = format!("{}{}", rett.table_prefix(), "bin_write_index_v02");
|
||||
if has_table(&ks, &tn, scy).await? {
|
||||
chs.add_todo(format!("drop table {}.{}", ks, tn));
|
||||
}
|
||||
}
|
||||
@@ -716,8 +721,15 @@ pub async fn migrate_scylla_data_schema_all_rt(
|
||||
RetentionTime::Long,
|
||||
RetentionTime::Short,
|
||||
];
|
||||
for ((rt, scyconf), chs) in rts.clone().into_iter().zip(scyconfs.iter()).zip(chsa.iter_mut()) {
|
||||
migrate_scylla_data_schema(scyconf, rt, chs).await?;
|
||||
let rfs = [3, 3, 3, 1];
|
||||
for (((rt, scyconf), chs), rf) in rts
|
||||
.clone()
|
||||
.into_iter()
|
||||
.zip(scyconfs.iter())
|
||||
.zip(chsa.iter_mut())
|
||||
.zip(rfs.iter().map(|&x| x))
|
||||
{
|
||||
migrate_scylla_data_schema(scyconf, rt, rf, chs).await?;
|
||||
}
|
||||
let todo = chsa.iter().any(|x| x.has_to_do());
|
||||
if do_change {
|
||||
|
||||
@@ -485,7 +485,7 @@ impl BinWriter {
|
||||
series: series.id() as i64,
|
||||
pbp: pbp_ix.db_ix() as i16,
|
||||
msp: msp as i32,
|
||||
rt: rt.index_db_i32() as i16,
|
||||
rt: rt.to_index_db_i32() as i16,
|
||||
lsp: lsp as i32,
|
||||
binlen: pbp.bin_len().ms() as i32,
|
||||
};
|
||||
@@ -513,7 +513,7 @@ impl BinWriter {
|
||||
series: series.id() as i64,
|
||||
pbp: pbp_ix.db_ix() as i16,
|
||||
msp: msp as i32,
|
||||
rt: rt.index_db_i32() as i16,
|
||||
rt: rt.to_index_db_i32() as i16,
|
||||
lsp: lsp as i32,
|
||||
binlen: pbp.bin_len().ms() as i32,
|
||||
};
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use crate::writer::EmittableType;
|
||||
use crate::writer::SeriesWriter;
|
||||
use core::fmt;
|
||||
use netpod::log::*;
|
||||
use netpod::DtNano;
|
||||
use netpod::TsNano;
|
||||
use netpod::log;
|
||||
use scywr::iteminsertqueue::QueryItem;
|
||||
use series::SeriesId;
|
||||
use std::collections::VecDeque;
|
||||
@@ -11,7 +11,9 @@ use std::marker::PhantomData;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
macro_rules! trace_rt_decision { ($det:expr, $($arg:tt)*) => { if $det { trace!($($arg)*); } }; }
|
||||
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); }
|
||||
macro_rules! trace { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ); }
|
||||
macro_rules! trace_rt_decision { ($det:expr, $($arg:expr),*) => ( if $det { log::trace!($($arg),*); } ); }
|
||||
|
||||
autoerr::create_error_v1!(
|
||||
name(Error, "RateLimitWriter"),
|
||||
@@ -92,10 +94,13 @@ where
|
||||
if false {
|
||||
trace_rt_decision!(
|
||||
det,
|
||||
"{dbgname} {sid} min_quiet {min_quiet:?} ts1 {ts1:?} ts2 {ts2:?} item {item:?}",
|
||||
ts1 = ts.ms(),
|
||||
ts2 = tsl.ms(),
|
||||
item = item,
|
||||
"{} {} min_quiet {:?} ts1 {:?} ts2 {:?} item {:?}",
|
||||
dbgname,
|
||||
sid,
|
||||
min_quiet,
|
||||
ts.ms(),
|
||||
tsl.ms(),
|
||||
item
|
||||
);
|
||||
}
|
||||
let do_write = {
|
||||
@@ -105,7 +110,11 @@ where
|
||||
} else if ts < tsl {
|
||||
trace_rt_decision!(
|
||||
det,
|
||||
"{dbgname} {sid} ignore, because ts_local rewind {ts:?} {tsl:?}",
|
||||
"{} {} ignore, because ts_local rewind {:?} {:?}",
|
||||
dbgname,
|
||||
sid,
|
||||
ts,
|
||||
tsl
|
||||
);
|
||||
false
|
||||
} else if !self.is_polled && ts.ms() < tsl.ms() + min_quiet {
|
||||
@@ -120,20 +129,8 @@ where
|
||||
} else if ts < tsl.add_dt_nano(DtNano::from_ms(5)) {
|
||||
trace_rt_decision!(det, "{dbgname} {sid} ignore, because store rate cap");
|
||||
false
|
||||
} else if self
|
||||
.last_insert_val
|
||||
.as_ref()
|
||||
.map(|k| !item.has_change(k))
|
||||
.unwrap_or(false)
|
||||
{
|
||||
trace_rt_decision!(det, "{dbgname} {sid} ignore, because value did not change");
|
||||
false
|
||||
} else {
|
||||
trace_rt_decision!(det, "{dbgname} {sid} accept");
|
||||
if true {
|
||||
self.last_insert_val = Some(item.clone());
|
||||
}
|
||||
self.last_insert_ts = ts.clone();
|
||||
true
|
||||
}
|
||||
};
|
||||
|
||||
@@ -12,6 +12,7 @@ use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
macro_rules! trace_emit { ($det:expr, $($arg:expr),*) => ( if $det { log::trace!($($arg),*); } ); }
|
||||
macro_rules! trace_rt_decision { ($det:expr, $($arg:expr),*) => ( if $det { log::trace!($($arg),*); } ); }
|
||||
|
||||
autoerr::create_error_v1!(
|
||||
name(Error, "SerieswriterRtwriter"),
|
||||
@@ -48,6 +49,10 @@ impl WriteRes {
|
||||
pub fn nstatus(&self) -> u8 {
|
||||
self.st.status + self.mt.status + self.lt.status
|
||||
}
|
||||
|
||||
pub fn accept_any(&self) -> bool {
|
||||
self.lt.accept || self.mt.accept || self.st.accept
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -81,6 +86,8 @@ where
|
||||
min_quiets: MinQuiets,
|
||||
do_trace_detail: bool,
|
||||
do_st_rf1: bool,
|
||||
last_insert_ts: TsNano,
|
||||
last_insert_val: Option<ET>,
|
||||
}
|
||||
|
||||
impl<ET> RtWriter<ET>
|
||||
@@ -119,6 +126,8 @@ where
|
||||
min_quiets,
|
||||
do_trace_detail: netpod::TRACE_SERIES_ID.contains(&series.id()),
|
||||
do_st_rf1,
|
||||
last_insert_ts: TsNano::from_ns(0),
|
||||
last_insert_val: None,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
@@ -152,9 +161,15 @@ where
|
||||
// Optimize for the common case that we only write into one of the stores.
|
||||
// Make the decision first, based on ref, then clone only as required.
|
||||
let res_lt;
|
||||
let mut res_mt = WriteRtRes::default();
|
||||
let mut res_st = WriteRtRes::default();
|
||||
let res_mt;
|
||||
let res_st;
|
||||
if self
|
||||
.last_insert_val
|
||||
.as_ref()
|
||||
.map(|k| item.has_change(k))
|
||||
.unwrap_or(true)
|
||||
{
|
||||
// TODO filter duplicate values already here
|
||||
res_lt = Self::write_inner(&mut self.state_lt, item.clone(), ts_net, tsev, &mut iqdqs.lt_rf3_qu)?;
|
||||
if !res_lt.accept {
|
||||
res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, tsev, &mut iqdqs.mt_rf3_qu)?;
|
||||
@@ -166,14 +181,28 @@ where
|
||||
res_st =
|
||||
Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf3_qu)?;
|
||||
}
|
||||
} else {
|
||||
res_st = WriteRtRes::default();
|
||||
}
|
||||
} else {
|
||||
res_mt = WriteRtRes::default();
|
||||
res_st = WriteRtRes::default();
|
||||
}
|
||||
} else {
|
||||
trace_rt_decision!(det, "{} ignore, because value did not change", self.series);
|
||||
res_lt = WriteRtRes::default();
|
||||
res_mt = WriteRtRes::default();
|
||||
res_st = WriteRtRes::default();
|
||||
}
|
||||
let ret = WriteRes {
|
||||
st: res_st,
|
||||
mt: res_mt,
|
||||
lt: res_lt,
|
||||
};
|
||||
if ret.accept_any() {
|
||||
self.last_insert_ts = tsev.clone();
|
||||
self.last_insert_val = Some(item.clone());
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user