Write only to one of the retention times

This commit is contained in:
Dominik Werder
2024-11-12 11:42:09 +01:00
parent eacef9083b
commit f53d9dbfc5
5 changed files with 63 additions and 34 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.2.5-aa.0"
version = "0.2.5-aa.2"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -1778,8 +1778,8 @@ impl CaConn {
return Ok(());
};
let dbg_chn = dbg_chn_cid(cid, self);
let (ch_s, ch_wrst) = if let Some(x) = self.channels.get_mut(&cid) {
(&mut x.state, &mut x.wrst)
let (ch_s, ch_wrst, ch_conf) = if let Some(x) = self.channels.get_mut(&cid) {
(&mut x.state, &mut x.wrst, &x.conf)
} else {
// TODO return better as error and let caller decide (with more structured errors)
warn!("TODO handle_event_add_res can not find channel for {cid:?} {subid:?}");
@@ -1870,6 +1870,7 @@ impl CaConn {
iqdqs,
tsnow,
stnow,
ch_conf.use_ioc_time(),
stats,
&mut self.rng,
)?;
@@ -1900,6 +1901,7 @@ impl CaConn {
iqdqs,
tsnow,
stnow,
ch_conf.use_ioc_time(),
stats,
&mut self.rng,
)?;
@@ -2026,8 +2028,8 @@ impl CaConn {
}
} else {
if let Some(cid) = self.read_ioids.get(&ioid) {
let (ch_s, ch_wrst) = if let Some(x) = self.channels.get_mut(cid) {
(&mut x.state, &mut x.wrst)
let (ch_s, ch_wrst, ch_conf) = if let Some(x) = self.channels.get_mut(cid) {
(&mut x.state, &mut x.wrst, &x.conf)
} else {
warn!("handle_read_notify_res can not find channel for {cid:?} {ioid:?}");
return Ok(());
@@ -2073,6 +2075,7 @@ impl CaConn {
iqdqs,
stnow,
tsnow,
ch_conf.use_ioc_time(),
stats,
&mut self.rng,
)?;
@@ -2126,6 +2129,7 @@ impl CaConn {
iqdqs,
stnow,
tsnow,
ch_conf.use_ioc_time(),
stats,
&mut self.rng,
)?;
@@ -2157,6 +2161,7 @@ impl CaConn {
iqdqs: &mut InsertDeques,
stnow: SystemTime,
tsnow: Instant,
use_ioc_time: bool,
stats: &CaConnStats,
rng: &mut Xoshiro128PlusPlus,
) -> Result<(), Error> {
@@ -2173,6 +2178,7 @@ impl CaConn {
iqdqs,
tsnow,
stnow,
use_ioc_time,
stats,
rng,
)?;
@@ -2189,6 +2195,7 @@ impl CaConn {
iqdqs: &mut InsertDeques,
tsnow: Instant,
stnow: SystemTime,
use_ioc_time: bool,
stats: &CaConnStats,
rng: &mut Xoshiro128PlusPlus,
) -> Result<(), Error> {
@@ -2232,13 +2239,19 @@ impl CaConn {
stats.ca_ts_off().ingest((ts_diff / MS) as u32);
}
{
let tsev = tsev_local;
let tsev = if use_ioc_time {
if let Some(x) = value.ts() {
TsNano::from_ns(x)
} else {
tsev_local
}
} else {
tsev_local
};
Self::check_ev_value_data(&value.data, &writer.scalar_type())?;
crst.muted_before = 0;
crst.insert_item_ivl_ema.tick(tsnow);
// let ts_ioc = TsNano::from_ns(ts);
// let ts_local = TsNano::from_ns(ts_local);
binwriter.ingest(tsev_local, value.f32_for_binning(), iqdqs)?;
binwriter.ingest(tsev, value.f32_for_binning(), iqdqs)?;
{
let wres = writer.write(CaWriterValue::new(value, crst), tsnow, tsev, iqdqs)?;
crst.status_emit_count += wres.nstatus() as u64;

View File

@@ -734,7 +734,7 @@ impl Stream for FindIocStream {
if let Some(fut) = self.sleeper.as_mut() {
match fut.poll_unpin(cx) {
Ready(()) => {
if self.sleep_count < 0 {
if false && self.sleep_count < 10 {
self.sleeper =
Some(Box::pin(tokio::time::sleep(Duration::from_millis(100))));
self.sleep_count += 1;

View File

@@ -334,10 +334,6 @@ fn bool_is_false(x: &bool) -> bool {
*x == false
}
fn bool_is_true(x: &bool) -> bool {
*x == false
}
fn bool_true() -> bool {
true
}
@@ -665,6 +661,13 @@ impl ChannelConfig {
}
}
pub fn use_ioc_time(&self) -> bool {
match &self.arch.timestamp {
ChannelTimestamp::Archiver => false,
ChannelTimestamp::IOC => true,
}
}
// TODO remove when no longer needed.
pub fn dummy() -> Self {
Self {

View File

@@ -65,6 +65,16 @@ pub struct WriteRtRes {
pub status: u8,
}
impl Default for WriteRtRes {
fn default() -> Self {
Self {
accept: false,
bytes: 0,
status: 0,
}
}
}
#[derive(Debug)]
pub struct RtWriter<ET>
where
@@ -146,25 +156,22 @@ where
// TODO
// Optimize for the common case that we only write into one of the stores.
// Make the decision first, based on ref, then clone only as required.
let res_st = Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf3_qu)?;
let res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, tsev, &mut iqdqs.mt_rf3_qu)?;
let res_lt = Self::write_inner(&mut self.state_lt, item, ts_net, tsev, &mut iqdqs.lt_rf3_qu)?;
let mut res_lt = WriteRtRes::default();
let mut res_mt = WriteRtRes::default();
let mut res_st = WriteRtRes::default();
if true {
res_lt = Self::write_inner(&mut self.state_lt, item.clone(), ts_net, tsev, &mut iqdqs.lt_rf3_qu)?;
}
if !res_lt.accept {
res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, tsev, &mut iqdqs.mt_rf3_qu)?;
}
if !res_mt.accept {
res_st = Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf3_qu)?;
}
let ret = WriteRes {
st: WriteRtRes {
accept: res_st.accept,
bytes: res_st.bytes,
status: res_st.status,
},
mt: WriteRtRes {
accept: res_mt.accept,
bytes: res_mt.bytes,
status: res_mt.status,
},
lt: WriteRtRes {
accept: res_lt.accept,
bytes: res_lt.bytes,
status: res_lt.status,
},
st: res_st,
mt: res_mt,
lt: res_lt,
};
Ok(ret)
}
@@ -175,8 +182,14 @@ where
ts_net: Instant,
tsev: TsNano,
deque: &mut VecDeque<QueryItem>,
) -> Result<crate::ratelimitwriter::WriteRes, Error> {
Ok(state.writer.write(item, ts_net, tsev, deque)?)
) -> Result<WriteRtRes, Error> {
let x = state.writer.write(item, ts_net, tsev, deque)?;
let ret = WriteRtRes {
accept: x.accept,
bytes: x.bytes,
status: x.status,
};
Ok(ret)
}
pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {