This commit is contained in:
Dominik Werder
2024-08-30 16:29:47 +02:00
parent 3ddcc90363
commit 79aa8d0466
10 changed files with 452 additions and 68 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.2.4-aa.0"
version = "0.2.4-aa.1"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
@@ -16,8 +16,8 @@ serde = { version = "1.0", features = ["derive"] }
tokio-postgres = "0.7.10"
async-channel = "2.3.1"
futures-util = "0.3"
chrono = "0.4"
bytes = "1.6.0"
chrono = "0.4.38"
bytes = "1.7.1"
libc = "0.2"
err = { path = "../../daqbuffer/crates/err" }
netpod = { path = "../../daqbuffer/crates/netpod" }

View File

@@ -36,6 +36,7 @@ libc = "0.2"
slidebuf = "0.0.1"
dashmap = "6.0.1"
hashbrown = "0.14.3"
smallvec = "1.13.2"
log = { path = "../log" }
series = { path = "../series" }
serieswriter = { path = "../serieswriter" }

View File

@@ -91,6 +91,8 @@ const MONITOR_POLL_TIMEOUT: Duration = Duration::from_millis(1000 * 6);
const TIMEOUT_CHANNEL_CLOSING: Duration = Duration::from_millis(1000 * 8);
const TIMEOUT_PONG_WAIT: Duration = Duration::from_millis(1000 * 10);
const READ_CHANNEL_VALUE_STATUS_EMIT_QUIET_MIN: Duration = Duration::from_millis(1000 * 120);
const SILENCE_READ_NEXT_IVL: Duration = Duration::from_millis(1000 * 200);
const POLL_READ_TIMEOUT: Duration = Duration::from_millis(1000 * 10);
const DO_RATE_CHECK: bool = false;
#[allow(unused)]
@@ -385,15 +387,37 @@ struct PollingState {
tick: PollTickState,
}
#[derive(Debug, Clone)]
struct PollTickStateIdle {
next: Instant,
}
impl PollTickStateIdle {
fn decide_next(next_backup: Instant, ivl: Duration, tsnow: Instant) -> Instant {
let next = next_backup + ivl;
if next <= tsnow {
let mut next = next;
while next <= tsnow {
next += ivl;
}
next
} else {
next
}
}
}
#[derive(Debug, Clone)]
struct PollTickStateWait {
next_backup: Instant,
since: Instant,
ioid: Ioid,
}
#[derive(Debug, Clone)]
enum PollTickState {
// TODO use inner struct to give this Instant a name.
// When monitoring, update this ts on received events.
// It should hold the Instant when we entered this state, but a receive of some event
// is considered re-entering this state.
Idle(Instant),
// TODO use inner struct to give this Instant a name
Wait(Instant, Ioid),
Idle(PollTickStateIdle),
Wait(PollTickStateWait),
}
#[derive(Debug)]
@@ -971,8 +995,9 @@ pub enum EndOfStreamReason {
}
pub struct CaConnOpts {
insert_queue_max: usize,
array_truncate: usize,
// TODO make private when we don't share it anymore
pub(super) insert_queue_max: usize,
pub(super) array_truncate: usize,
}
impl CaConnOpts {
@@ -1037,6 +1062,7 @@ pub struct CaConn {
ioid: u32,
read_ioids: HashMap<Ioid, Cid>,
handler_by_ioid: HashMap<Ioid, Option<Pin<Box<dyn ConnFuture>>>>,
trace_channel_poll: bool,
}
impl Drop for CaConn {
@@ -1101,31 +1127,33 @@ impl CaConn {
ioid: 100,
read_ioids: HashMap::new(),
handler_by_ioid: HashMap::new(),
trace_channel_poll: false,
}
}
fn ioc_ping_ivl_rng(rng: &mut Xoshiro128PlusPlus) -> Duration {
let b = IOC_PING_IVL;
b + b / 128 * (rng.next_u32() & 0x1f)
b + b * (rng.next_u32() & 0x3f) / 0xff
}
fn channel_status_emit_ivl(rng: &mut Xoshiro128PlusPlus) -> Duration {
let b = CHANNEL_STATUS_EMIT_IVL;
b + b / 128 * (rng.next_u32() & 0x1f)
b + b * (rng.next_u32() & 0x3f) / 0xff
}
fn silence_read_next_ivl_rng(rng: &mut Xoshiro128PlusPlus) -> Duration {
Duration::from_millis(1000 * 300 + (rng.next_u32() & 0x3fff) as u64)
let b = SILENCE_READ_NEXT_IVL;
b + b * (rng.next_u32() & 0x7f) / 0xff
}
fn recv_value_status_emit_ivl_rng(rng: &mut Xoshiro128PlusPlus) -> Duration {
let b = READ_CHANNEL_VALUE_STATUS_EMIT_QUIET_MIN;
b + b / 128 * (rng.next_u32() & 0x1f)
b + b * (rng.next_u32() & 0x3f) / 0xff
}
fn new_self_ticker(rng: &mut Xoshiro128PlusPlus) -> Pin<Box<tokio::time::Sleep>> {
let b = Duration::from_millis(1500);
let dur = b + b / 128 * (rng.next_u32() & 0x1f);
let b = Duration::from_millis(110);
let dur = b + b * (rng.next_u32() & 0x3f) / 0xff;
Box::pin(tokio::time::sleep(dur))
}
@@ -1299,6 +1327,7 @@ impl CaConn {
scalar_type,
shape,
ch.conf.min_quiets(),
ch.conf.is_polled(),
&|| CaWriterValueState::new(st.series_status, chinfo.series.to_series()),
)?;
self.handle_writer_establish_inner(cid, writer)?;
@@ -1366,6 +1395,11 @@ impl CaConn {
conf.wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?;
}
if let Some((ivl,)) = conf_poll_conf {
let ivl = Duration::from_millis(ivl);
if self.trace_channel_poll {
trace!("make poll idle state from writer establish");
}
let next = self.poll_tsnow + ivl * (self.rng.next_u32() & 0x1ff) / 511;
let created_state = WritableState {
tsbeg: self.poll_tsnow,
channel: std::mem::replace(&mut st2.channel, CreatedState::dummy()),
@@ -1373,8 +1407,8 @@ impl CaConn {
binwriter,
reading: ReadingState::Polling(PollingState {
tsbeg: self.poll_tsnow,
poll_ivl: Duration::from_millis(ivl),
tick: PollTickState::Idle(self.poll_tsnow),
poll_ivl: ivl,
tick: PollTickState::Idle(PollTickStateIdle { next }),
}),
};
conf.state = ChannelState::Writable(created_state);
@@ -1434,6 +1468,11 @@ impl CaConn {
}
pub fn channel_add(&mut self, conf: ChannelConfig, cssid: ChannelStatusSeriesId) -> Result<(), Error> {
if false {
if netpod::trigger.contains(&conf.name()) {
self.trace_channel_poll = true;
}
}
if self.cid_by_name(conf.name()).is_some() {
self.stats.channel_add_exists.inc();
if trigger.contains(&conf.name()) {
@@ -1800,10 +1839,14 @@ impl CaConn {
match ch_s {
ChannelState::Writable(st) => match &mut st.reading {
ReadingState::StopMonitoringForPolling(..) => {
if self.trace_channel_poll {
trace!("make poll idle from event add empty");
}
let ivl = Duration::from_millis(1000);
st.reading = ReadingState::Polling(PollingState {
tsbeg: tsnow,
poll_ivl: Duration::from_millis(1000),
tick: PollTickState::Idle(tsnow),
poll_ivl: ivl,
tick: PollTickState::Idle(PollTickStateIdle { next: tsnow }),
});
}
ReadingState::EnableMonitoring(..) => {
@@ -1882,15 +1925,19 @@ impl CaConn {
}
match &mut st.reading {
ReadingState::Polling(st2) => match &mut st2.tick {
PollTickState::Idle(_st3) => {
PollTickState::Idle(_) => {
self.stats.recv_read_notify_while_polling_idle.inc();
}
PollTickState::Wait(st3, ioid) => {
let dt = tsnow.saturating_duration_since(*st3);
self.stats.caget_lat().ingest((1e3 * dt.as_secs_f32()) as u32);
PollTickState::Wait(st3) => {
let dt = tsnow.saturating_duration_since(st3.since);
self.stats.caget_lat().ingest_dur_dms(dt);
// TODO maintain histogram of read-notify latencies
self.read_ioids.remove(ioid);
st2.tick = PollTickState::Idle(tsnow);
self.read_ioids.remove(&st3.ioid);
let next = PollTickStateIdle::decide_next(st3.next_backup, st2.poll_ivl, tsnow);
if self.trace_channel_poll {
trace!("make next poll idle at {next:?} tsnow {tsnow:?}");
}
st2.tick = PollTickState::Idle(PollTickStateIdle { next });
let iqdqs = &mut self.iqdqs;
let stats = self.stats.as_ref();
Self::read_notify_res_for_write(
@@ -2059,7 +2106,7 @@ impl CaConn {
stats.ca_ts_off().ingest((ts_diff / MS) as u32);
}
{
let evts = ts_local;
let tsev = ts_local;
Self::check_ev_value_data(&value.data, &writer.scalar_type())?;
crst.muted_before = 0;
crst.insert_item_ivl_ema.tick(tsnow);
@@ -2067,7 +2114,7 @@ impl CaConn {
// let ts_local = TsNano::from_ns(ts_local);
// binwriter.ingest(ts_ioc, ts_local, &val, iqdqs)?;
{
let wres = writer.write(CaWriterValue::new(value, crst), tsnow, evts, iqdqs)?;
let wres = writer.write(CaWriterValue::new(value, crst), tsnow, tsev, iqdqs)?;
crst.status_emit_count += wres.nstatus() as u64;
if wres.st.accept {
crst.dw_st_last = stnow;
@@ -2323,8 +2370,8 @@ impl CaConn {
},
ReadingState::StopMonitoringForPolling(_) => {}
ReadingState::Polling(st3) => match &mut st3.tick {
PollTickState::Idle(x) => {
if *x + st3.poll_ivl <= tsnow {
PollTickState::Idle(st4) => {
if st4.next <= tsnow {
let ioid = Ioid(self.ioid);
self.ioid = self.ioid.wrapping_add(1);
self.read_ioids.insert(ioid, st2.channel.cid.clone());
@@ -2339,15 +2386,23 @@ impl CaConn {
);
do_wake_again = true;
self.proto.as_mut().unwrap().push_out(msg);
st3.tick = PollTickState::Wait(tsnow, ioid);
st3.tick = PollTickState::Wait(PollTickStateWait {
next_backup: st4.next,
since: tsnow,
ioid,
});
self.stats.caget_issued().inc();
}
}
PollTickState::Wait(x, ioid) => {
if *x + Duration::from_millis(10000) <= tsnow {
self.read_ioids.remove(ioid);
PollTickState::Wait(st4) => {
if st4.since + POLL_READ_TIMEOUT <= tsnow {
self.read_ioids.remove(&st4.ioid);
self.stats.caget_timeout().inc();
st3.tick = PollTickState::Idle(tsnow);
let next = PollTickStateIdle::decide_next(st4.next_backup, st3.poll_ivl, tsnow);
if self.trace_channel_poll {
trace!("make poll idle after poll timeout {next:?}");
}
st3.tick = PollTickState::Idle(PollTickStateIdle { next });
}
}
},
@@ -2490,8 +2545,7 @@ impl CaConn {
// let addr = &self.remote_addr_dbg;
if let Some(started) = self.ioc_ping_start {
let dt = started.elapsed();
let dt = dt.as_secs() as u32 + dt.subsec_millis();
self.stats.pong_recv_lat().ingest(dt);
self.stats.pong_recv_lat().ingest_dur_dms(dt);
} else {
let addr = &self.remote_addr_dbg;
warn!("received Echo even though we didn't asked for it {addr:?}");
@@ -2844,11 +2898,15 @@ impl CaConn {
}
fn handle_own_ticker(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> {
// debug!("tick CaConn {}", self.remote_addr_dbg);
let tsnow = Instant::now();
if !self.is_shutdown() {
self.ticker = Self::new_self_ticker(&mut self.rng);
let _ = self.ticker.poll_unpin(cx);
match self.ticker.poll_unpin(cx) {
Poll::Pending => {}
_ => {
return Err(Error::FutLogic);
}
}
// cx.waker().wake_by_ref();
}
self.check_channels_state_init(tsnow, cx)?;
@@ -2932,7 +2990,6 @@ impl CaConn {
count: acc.usage().count() as _,
bytes: acc.usage().bytes() as _,
};
trace!("EMIT ITEM {rt:?} {item:?}");
self.iqdqs.emit_accounting_item(rt, item)?;
}
acc.reset(msp);
@@ -3145,7 +3202,6 @@ impl Stream for CaConn {
self.poll_tsnow = Instant::now();
self.tmp_ts_poll = SystemTime::now();
let poll_ts1 = Instant::now();
self.stats.poll_count().inc();
self.stats.poll_fn_begin().inc();
let mut reloops: u32 = 0;
let ret = loop {
@@ -3318,7 +3374,7 @@ impl Stream for CaConn {
debug!("LONG OPERATION 2 {:.0} ms", 1e3 * dt.as_secs_f32());
}
let dt = lts3.saturating_duration_since(lts2);
self.stats.poll_op3_dt().ingest((1e3 * dt.as_secs_f32()) as u32);
self.stats.poll_op3_dt().ingest_dur_dms(dt);
if dt > max {
debug!("LONG OPERATION 3 {:.0} ms", 1e3 * dt.as_secs_f32());
}
@@ -3389,7 +3445,16 @@ impl Stream for CaConn {
};
let poll_ts2 = Instant::now();
let dt = poll_ts2.saturating_duration_since(poll_ts1);
self.stats.poll_all_dt().ingest((1e3 * dt.as_secs_f32()) as u32);
if self.trace_channel_poll {
self.stats.poll_all_dt().ingest_dur_dms(dt);
if dt >= Duration::from_millis(10) {
trace!("long poll {dt:?}");
} else if dt >= Duration::from_micros(400) {
let v = self.stats.poll_all_dt.to_display();
let ip = self.remote_addr_dbg;
trace!("poll_all_dt {ip} {v}");
}
}
self.stats.read_ioids_len().set(self.read_ioids.len() as u64);
let n = match &self.proto {
Some(x) => x.proto_out_len() as u64,
@@ -3442,7 +3507,7 @@ impl CaWriterValue {
.map_or_else(|| String::from("undefined"), String::from)
},
);
trace!("CaWriterValue convert enum {} {:?}", crst.name(), conv);
// trace!("CaWriterValue convert enum {} {:?}", crst.name(), conv);
Some(conv)
}
_ => None,

View File

@@ -1,3 +1,13 @@
trait Channel {}
use err::thiserror;
use err::ThisError;
#[derive(Debug, ThisError)]
#[cstm(name = "ConnChannelError")]
pub enum Error {}
trait Channel {
fn can_accept_ca_msg(&self) -> bool;
fn process_ca_msg(&mut self, msg: crate::ca::proto::CaMsg) -> Result<(), Error>;
}
struct ChannelAny {}

View File

@@ -11,21 +11,48 @@ use futures_util::Stream;
use futures_util::StreamExt;
use hashbrown::HashMap;
use log::*;
use scywr::insertqueues::InsertDeques;
use scywr::insertqueues::InsertQueuesTx;
use scywr::iteminsertqueue::QueryItem;
use stats::rand_xoshiro::Xoshiro128PlusPlus;
use stats::CaConnStats;
use stats::CaProtoStats;
use std::collections::VecDeque;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
use tokio::net::TcpStream;
struct DurationMeasureSteps {
ts: Instant,
durs: smallvec::SmallVec<[Duration; 8]>,
}
impl DurationMeasureSteps {
fn new() -> Self {
Self {
ts: Instant::now(),
durs: smallvec::SmallVec::new(),
}
}
fn step(&mut self) {
let ts = Instant::now();
let d = ts.saturating_duration_since(self.ts);
self.durs.push(d);
self.ts = ts;
}
}
#[derive(Debug)]
pub enum Error {}
pub enum Error {
TickerPoll,
}
type ConnectingFut =
Pin<Box<dyn Future<Output = Result<Result<TcpStream, std::io::Error>, tokio::time::error::Elapsed>> + Send>>;
@@ -47,7 +74,11 @@ struct CaConn {
opts: CaConnOpts,
backend: String,
state: CaConnState,
iqdqs: InsertDeques,
ca_conn_event_out_queue: VecDeque<CaConnEvent>,
ca_conn_event_out_queue_max: usize,
rng: Xoshiro128PlusPlus,
stats: Arc<CaConnStats>,
}
impl CaConn {
@@ -68,9 +99,24 @@ impl CaConn {
opts,
backend,
state: CaConnState::Connecting(tsnow, remote_addr, err::todoval()),
iqdqs: InsertDeques::new(),
ca_conn_event_out_queue: VecDeque::new(),
ca_conn_event_out_queue_max: 2000,
rng,
stats,
}
}
fn poll_own_ticker(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<Poll<()>, Error> {
// TODO nothing taken yet
todo!()
}
// call this only from the main fn poll
fn shutdown_on_error(&mut self, e: Error) {
self.state = CaConnState::Shutdown(EndOfStreamReason::Error(e));
todo!()
}
}
impl Stream for CaConn {
@@ -78,6 +124,241 @@ impl Stream for CaConn {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
todo!()
let mut durs = DurationMeasureSteps::new();
self.stats.poll_fn_begin().inc();
let ret = loop {
self.stats.poll_loop_begin().inc();
let qlen = self.iqdqs.len();
if qlen >= self.opts.insert_queue_max * 2 / 3 {
self.stats.insert_item_queue_pressure().inc();
} else if qlen >= self.opts.insert_queue_max {
self.stats.insert_item_queue_full().inc();
}
let mut have_pending = false;
let mut have_progress = false;
if let CaConnState::Done = self.state {
break Ready(None);
} else if let Some(item) = self.ca_conn_event_out_queue.pop_front() {
break Ready(Some(item));
}
// TODO add up duration of this scope
match self.as_mut().poll_own_ticker(cx) {
Ok(Ready(())) => {
have_progress = true;
}
Ok(Pending) => {
have_pending = true;
}
Err(e) => {
self.shutdown_on_error(e);
continue;
}
}
{
let n = self.iqdqs.len();
self.stats.iiq_len().ingest(n as u32);
}
{
// let stats2 = self.stats.clone();
// let stats_fn = move |item: &VecDeque<QueryItem>| {
// stats2.iiq_batch_len().ingest(item.len() as u32);
// };
// flush_queue_dqs!(
// self,
// st_rf1_qu,
// st_rf1_sp_pin,
// send_batched::<256, _>,
// 32,
// (&mut have_progress, &mut have_pending),
// "st_rf1_rx",
// cx,
// stats_fn
// );
// let stats2 = self.stats.clone();
// let stats_fn = move |item: &VecDeque<QueryItem>| {
// stats2.iiq_batch_len().ingest(item.len() as u32);
// };
// flush_queue_dqs!(
// self,
// st_rf3_qu,
// st_rf3_sp_pin,
// send_batched::<256, _>,
// 32,
// (&mut have_progress, &mut have_pending),
// "st_rf3_rx",
// cx,
// stats_fn
// );
// let stats2 = self.stats.clone();
// let stats_fn = move |item: &VecDeque<QueryItem>| {
// stats2.iiq_batch_len().ingest(item.len() as u32);
// };
// flush_queue_dqs!(
// self,
// mt_rf3_qu,
// mt_rf3_sp_pin,
// send_batched::<256, _>,
// 32,
// (&mut have_progress, &mut have_pending),
// "mt_rf3_rx",
// cx,
// stats_fn
// );
// let stats2 = self.stats.clone();
// let stats_fn = move |item: &VecDeque<QueryItem>| {
// stats2.iiq_batch_len().ingest(item.len() as u32);
// };
// flush_queue_dqs!(
// self,
// lt_rf3_qu,
// lt_rf3_sp_pin,
// send_batched::<256, _>,
// 32,
// (&mut have_progress, &mut have_pending),
// "lt_rf3_rx",
// cx,
// stats_fn
// );
}
// if !self.is_shutdown() {
// flush_queue!(
// self,
// channel_info_query_qu,
// channel_info_query_tx,
// send_individual,
// 32,
// (&mut have_progress, &mut have_pending),
// "chinf",
// cx,
// |_| {}
// );
// }
// match self.as_mut().handle_writer_establish_result(cx) {
// Ok(Ready(Some(()))) => {
// have_progress = true;
// }
// Ok(Ready(None)) => {}
// Ok(Pending) => {
// have_pending = true;
// }
// Err(e) => break Ready(Some(CaConnEvent::err_now(e))),
// }
// match self.as_mut().handle_conn_command(cx) {
// Ok(Ready(Some(()))) => {
// have_progress = true;
// }
// Ok(Ready(None)) => {}
// Ok(Pending) => {
// have_pending = true;
// }
// Err(e) => break Ready(Some(CaConnEvent::err_now(e))),
// }
// match self.loop_inner(cx) {
// Ok(Ready(Some(()))) => {
// have_progress = true;
// }
// Ok(Ready(None)) => {}
// Ok(Pending) => {
// have_pending = true;
// }
// Err(e) => {
// error!("{e}");
// self.state = CaConnState::EndOfStream;
// break Ready(Some(CaConnEvent::err_now(e)));
// }
// }
break match self.state {
CaConnState::Connecting(_, _, _) => todo!(),
CaConnState::Connected(_) => todo!(),
CaConnState::Shutdown(_) => {
// TODO still attempt to flush queues.
// If all queues are flushed, go into Done state.
todo!()
}
CaConnState::Done => todo!(),
};
// break if self.is_shutdown() {
// if self.queues_out_flushed() {
// debug!("is_shutdown queues_out_flushed set EOS {}", self.remote_addr_dbg);
// if let CaConnState::Shutdown(x) = std::mem::replace(&mut self.state, CaConnState::EndOfStream) {
// Ready(Some(CaConnEvent::new_now(CaConnEventValue::EndOfStream(x))))
// } else {
// continue;
// }
// } else {
// if have_progress {
// debug!("is_shutdown NOT queues_out_flushed prog {}", self.remote_addr_dbg);
// self.stats.poll_reloop().inc();
// reloops += 1;
// continue;
// } else if have_pending {
// debug!("is_shutdown NOT queues_out_flushed pend {}", self.remote_addr_dbg);
// self.log_queues_summary();
// self.stats.poll_pending().inc();
// Pending
// } else {
// // TODO error
// error!("shutting down, queues not flushed, no progress, no pending");
// self.stats.logic_error().inc();
// let e = Error::ShutdownWithQueuesNoProgressNoPending;
// Ready(Some(CaConnEvent::err_now(e)))
// }
// }
// } else {
// if have_progress {
// if poll_ts1.elapsed() > Duration::from_millis(5) {
// self.stats.poll_wake_break().inc();
// cx.waker().wake_by_ref();
// break Ready(Some(CaConnEvent::new(self.poll_tsnow, CaConnEventValue::None)));
// } else {
// self.stats.poll_reloop().inc();
// reloops += 1;
// continue;
// }
// } else if have_pending {
// self.stats.poll_pending().inc();
// Pending
// } else {
// self.stats.poll_no_progress_no_pending().inc();
// let e = Error::NoProgressNoPending;
// Ready(Some(CaConnEvent::err_now(e)))
// }
// };
};
durs.step();
// if self.trace_channel_poll {
// self.stats.poll_all_dt().ingest_dur_dms(dt);
// if dt >= Duration::from_millis(10) {
// trace!("long poll {dt:?}");
// } else if dt >= Duration::from_micros(400) {
// let v = self.stats.poll_all_dt.to_display();
// let ip = self.remote_addr_dbg;
// trace!("poll_all_dt {ip} {v}");
// }
// }
// self.stats.read_ioids_len().set(self.read_ioids.len() as u64);
// let n = match &self.proto {
// Some(x) => x.proto_out_len() as u64,
// None => 0,
// };
// self.stats.proto_out_len().set(n);
// self.stats.poll_reloops().ingest(reloops);
ret
}
}

View File

@@ -41,6 +41,7 @@ where
{
series: SeriesId,
min_quiet: Duration,
is_polled: bool,
emit_state: <ET as EmittableType>::State,
last_insert_ts: TsNano,
last_insert_val: Option<ET>,
@@ -57,6 +58,7 @@ where
pub fn new(
series: SeriesId,
min_quiet: Duration,
is_polled: bool,
emit_state: <ET as EmittableType>::State,
dbgname: String,
) -> Result<Self, Error> {
@@ -64,6 +66,7 @@ where
let ret = Self {
series,
min_quiet,
is_polled,
emit_state,
last_insert_ts: TsNano::from_ns(0),
last_insert_val: None,
@@ -113,9 +116,15 @@ where
"{dbgname} {sid} ignore, because ts_local rewind {ts:?} {tsl:?}",
);
false
} else if ts.ms() < tsl.ms() + min_quiet {
} else if !self.is_polled && ts.ms() < tsl.ms() + min_quiet {
trace_rt_decision!(det, "{dbgname} {sid} ignore, because not min quiet {ts:?} {tsl:?}");
false
} else if self.is_polled && ts.ms() + 800 < tsl.ms() + min_quiet {
trace_rt_decision!(
det,
"{dbgname} {sid} ignore, because not is-polled min quiet {ts:?} {tsl:?}"
);
false
} else if ts < tsl.add_dt_nano(DtNano::from_ms(5)) {
trace_rt_decision!(det, "{dbgname} {sid} ignore, because store rate cap");
false

View File

@@ -89,19 +89,20 @@ where
scalar_type: ScalarType,
shape: Shape,
min_quiets: MinQuiets,
is_polled: bool,
emit_state_new: &dyn Fn() -> <ET as EmittableType>::State,
) -> Result<Self, Error> {
let state_st = {
// let writer = SeriesWriter::establish_with_sid(sid, stnow)?;
let writer = RateLimitWriter::new(series, min_quiets.st, emit_state_new(), "st".into())?;
let writer = RateLimitWriter::new(series, min_quiets.st, is_polled, emit_state_new(), "st".into())?;
State { writer }
};
let state_mt = {
let writer = RateLimitWriter::new(series, min_quiets.mt, emit_state_new(), "mt".into())?;
let writer = RateLimitWriter::new(series, min_quiets.mt, is_polled, emit_state_new(), "mt".into())?;
State { writer }
};
let state_lt = {
let writer = RateLimitWriter::new(series, min_quiets.lt, emit_state_new(), "lt".into())?;
let writer = RateLimitWriter::new(series, min_quiets.lt, is_polled, emit_state_new(), "lt".into())?;
State { writer }
};
let ret = Self {

View File

@@ -309,7 +309,6 @@ stats_proc::stats_struct!((
channel_info_insert_done,
ivl_insert_done,
mute_insert_done,
poll_count,
loop1_count,
loop2_count,
loop3_count,

View File

@@ -53,7 +53,7 @@ fn stats_struct_impl(st: &StatsStructDef) -> String {
let init_histolog2s = st
.histolog2s
.iter()
.map(|x| format!("{:12}{}: stats_types::HistoLog2::new(0)", "", x.to_string()));
.map(|x| format!("{:12}{}: stats_types::HistoLog2::new()", "", x.to_string()));
let inits: Vec<_> = inits1.into_iter().chain(inits2).chain(init_histolog2s).collect();
let inits = inits.join(",\n");
let incers: String = st

View File

@@ -1,9 +1,11 @@
pub use serde_json;
use core::fmt;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering::AcqRel;
use std::sync::atomic::Ordering::Acquire;
use std::sync::atomic::Ordering::Release;
use std::time::Duration;
#[derive(Debug)]
pub struct CounterDesc {
@@ -123,7 +125,6 @@ impl StatsAReader {}
pub struct HistoLog2 {
histo: [AtomicU64; 20],
sum: AtomicU64,
sub: u16,
}
macro_rules! rep16 {
@@ -135,24 +136,25 @@ macro_rules! rep16 {
}
impl HistoLog2 {
pub fn new(sub: u16) -> Self {
pub fn new() -> Self {
Self {
histo: rep16!([AtomicU64::new(0)]),
sum: AtomicU64::new(0),
sub,
}
}
#[inline]
pub fn ingest(&self, mut v: u32) {
pub fn ingest(&self, v: u32) {
self.sum.fetch_add(v as u64, AcqRel);
v >>= self.sub;
let mut po = 0;
while v != 0 && po < self.histo.len() - 1 {
v >>= 1;
po += 1;
}
self.histo[po].fetch_add(1, AcqRel);
let w = 32 - v.leading_zeros();
let i = w.min(15);
self.histo[i as usize].fetch_add(1, AcqRel);
}
#[inline]
pub fn ingest_dur_dms(&self, dt: Duration) {
let v = 10000 * dt.as_secs() as u32 + dt.subsec_micros() / 100;
self.ingest(v)
}
pub fn to_prometheus(&self, name: &str) -> String {
@@ -196,14 +198,30 @@ impl HistoLog2 {
ret
}
pub fn to_json(&self, _name: &str) -> serde_json::Value {
pub fn to_json(&self, name: &str) -> serde_json::Value {
let _ = name;
serde_json::Value::Null
}
pub fn to_display(&self) -> HistoLog2Display {
HistoLog2Display { inner: self }
}
}
pub struct HistoLog2Display<'a> {
inner: &'a HistoLog2,
}
impl<'a> fmt::Display for HistoLog2Display<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let histo: Vec<_> = self.inner.histo.iter().map(|x| x.load(Acquire)).collect();
write!(fmt, "HistoLog2 {{ histo: {:?}, sum: {:?} }}", histo, self.inner.sum)
}
}
#[test]
fn histo_00() {
let histo = HistoLog2::new(0);
let histo = HistoLog2::new();
// histo.ingest(0);
// histo.ingest(1);
// histo.ingest(2);