diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index b2d691d..7f86909 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.5-aa.0" +version = "0.2.5-aa.2" authors = ["Dominik Werder "] edition = "2021" diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 47eb476..e1d8297 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1778,8 +1778,8 @@ impl CaConn { return Ok(()); }; let dbg_chn = dbg_chn_cid(cid, self); - let (ch_s, ch_wrst) = if let Some(x) = self.channels.get_mut(&cid) { - (&mut x.state, &mut x.wrst) + let (ch_s, ch_wrst, ch_conf) = if let Some(x) = self.channels.get_mut(&cid) { + (&mut x.state, &mut x.wrst, &x.conf) } else { // TODO return better as error and let caller decide (with more structured errors) warn!("TODO handle_event_add_res can not find channel for {cid:?} {subid:?}"); @@ -1870,6 +1870,7 @@ impl CaConn { iqdqs, tsnow, stnow, + ch_conf.use_ioc_time(), stats, &mut self.rng, )?; @@ -1900,6 +1901,7 @@ impl CaConn { iqdqs, tsnow, stnow, + ch_conf.use_ioc_time(), stats, &mut self.rng, )?; @@ -2026,8 +2028,8 @@ impl CaConn { } } else { if let Some(cid) = self.read_ioids.get(&ioid) { - let (ch_s, ch_wrst) = if let Some(x) = self.channels.get_mut(cid) { - (&mut x.state, &mut x.wrst) + let (ch_s, ch_wrst, ch_conf) = if let Some(x) = self.channels.get_mut(cid) { + (&mut x.state, &mut x.wrst, &x.conf) } else { warn!("handle_read_notify_res can not find channel for {cid:?} {ioid:?}"); return Ok(()); @@ -2073,6 +2075,7 @@ impl CaConn { iqdqs, stnow, tsnow, + ch_conf.use_ioc_time(), stats, &mut self.rng, )?; @@ -2126,6 +2129,7 @@ impl CaConn { iqdqs, stnow, tsnow, + ch_conf.use_ioc_time(), stats, &mut self.rng, )?; @@ -2157,6 +2161,7 @@ impl CaConn { iqdqs: &mut InsertDeques, stnow: SystemTime, tsnow: Instant, + use_ioc_time: bool, stats: &CaConnStats, rng: &mut Xoshiro128PlusPlus, ) -> Result<(), Error> { @@ -2173,6 +2178,7 @@ impl CaConn { iqdqs, tsnow, stnow, + use_ioc_time, stats, rng, )?; @@ -2189,6 +2195,7 @@ impl CaConn { iqdqs: &mut InsertDeques, tsnow: Instant, stnow: SystemTime, + use_ioc_time: bool, stats: &CaConnStats, rng: &mut Xoshiro128PlusPlus, ) -> Result<(), Error> { @@ -2232,13 +2239,19 @@ impl CaConn { stats.ca_ts_off().ingest((ts_diff / MS) as u32); } { - let tsev = tsev_local; + let tsev = if use_ioc_time { + if let Some(x) = value.ts() { + TsNano::from_ns(x) + } else { + tsev_local + } + } else { + tsev_local + }; Self::check_ev_value_data(&value.data, &writer.scalar_type())?; crst.muted_before = 0; crst.insert_item_ivl_ema.tick(tsnow); - // let ts_ioc = TsNano::from_ns(ts); - // let ts_local = TsNano::from_ns(ts_local); - binwriter.ingest(tsev_local, value.f32_for_binning(), iqdqs)?; + binwriter.ingest(tsev, value.f32_for_binning(), iqdqs)?; { let wres = writer.write(CaWriterValue::new(value, crst), tsnow, tsev, iqdqs)?; crst.status_emit_count += wres.nstatus() as u64; diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index fa5b06e..165d168 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -734,7 +734,7 @@ impl Stream for FindIocStream { if let Some(fut) = self.sleeper.as_mut() { match fut.poll_unpin(cx) { Ready(()) => { - if self.sleep_count < 0 { + if false && self.sleep_count < 10 { self.sleeper = Some(Box::pin(tokio::time::sleep(Duration::from_millis(100)))); self.sleep_count += 1; diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 7ac2a91..1de7146 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -334,10 +334,6 @@ fn bool_is_false(x: &bool) -> bool { *x == false } -fn bool_is_true(x: &bool) -> bool { - *x == false -} - fn bool_true() -> bool { true } @@ -665,6 +661,13 @@ impl ChannelConfig { } } + pub fn use_ioc_time(&self) -> bool { + match &self.arch.timestamp { + ChannelTimestamp::Archiver => false, + ChannelTimestamp::IOC => true, + } + } + // TODO remove when no longer needed. pub fn dummy() -> Self { Self { diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index bd36fc3..99a3616 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -65,6 +65,16 @@ pub struct WriteRtRes { pub status: u8, } +impl Default for WriteRtRes { + fn default() -> Self { + Self { + accept: false, + bytes: 0, + status: 0, + } + } +} + #[derive(Debug)] pub struct RtWriter where @@ -146,25 +156,22 @@ where // TODO // Optimize for the common case that we only write into one of the stores. // Make the decision first, based on ref, then clone only as required. - let res_st = Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf3_qu)?; - let res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, tsev, &mut iqdqs.mt_rf3_qu)?; - let res_lt = Self::write_inner(&mut self.state_lt, item, ts_net, tsev, &mut iqdqs.lt_rf3_qu)?; + let mut res_lt = WriteRtRes::default(); + let mut res_mt = WriteRtRes::default(); + let mut res_st = WriteRtRes::default(); + if true { + res_lt = Self::write_inner(&mut self.state_lt, item.clone(), ts_net, tsev, &mut iqdqs.lt_rf3_qu)?; + } + if !res_lt.accept { + res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, tsev, &mut iqdqs.mt_rf3_qu)?; + } + if !res_mt.accept { + res_st = Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf3_qu)?; + } let ret = WriteRes { - st: WriteRtRes { - accept: res_st.accept, - bytes: res_st.bytes, - status: res_st.status, - }, - mt: WriteRtRes { - accept: res_mt.accept, - bytes: res_mt.bytes, - status: res_mt.status, - }, - lt: WriteRtRes { - accept: res_lt.accept, - bytes: res_lt.bytes, - status: res_lt.status, - }, + st: res_st, + mt: res_mt, + lt: res_lt, }; Ok(ret) } @@ -175,8 +182,14 @@ where ts_net: Instant, tsev: TsNano, deque: &mut VecDeque, - ) -> Result { - Ok(state.writer.write(item, ts_net, tsev, deque)?) + ) -> Result { + let x = state.writer.write(item, ts_net, tsev, deque)?; + let ret = WriteRtRes { + accept: x.accept, + bytes: x.bytes, + status: x.status, + }; + Ok(ret) } pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {