From 79aa8d04661aaa3fbd25ef53409194df412bcc4b Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 30 Aug 2024 16:29:47 +0200 Subject: [PATCH] WIP --- daqingest/Cargo.toml | 6 +- netfetch/Cargo.toml | 1 + netfetch/src/ca/conn.rs | 151 ++++++++++----- netfetch/src/ca/conn2/channel.rs | 12 +- netfetch/src/ca/conn2/conn.rs | 285 +++++++++++++++++++++++++++- serieswriter/src/ratelimitwriter.rs | 11 +- serieswriter/src/rtwriter.rs | 7 +- stats/src/stats.rs | 1 - stats_proc/src/stats_proc.rs | 2 +- stats_types/src/stats_types.rs | 44 +++-- 10 files changed, 452 insertions(+), 68 deletions(-) diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index c8f53a9..194caae 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.4-aa.0" +version = "0.2.4-aa.1" authors = ["Dominik Werder "] edition = "2021" @@ -16,8 +16,8 @@ serde = { version = "1.0", features = ["derive"] } tokio-postgres = "0.7.10" async-channel = "2.3.1" futures-util = "0.3" -chrono = "0.4" -bytes = "1.6.0" +chrono = "0.4.38" +bytes = "1.7.1" libc = "0.2" err = { path = "../../daqbuffer/crates/err" } netpod = { path = "../../daqbuffer/crates/netpod" } diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 1301382..66d45fb 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -36,6 +36,7 @@ libc = "0.2" slidebuf = "0.0.1" dashmap = "6.0.1" hashbrown = "0.14.3" +smallvec = "1.13.2" log = { path = "../log" } series = { path = "../series" } serieswriter = { path = "../serieswriter" } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 7983c80..f7f5c70 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -91,6 +91,8 @@ const MONITOR_POLL_TIMEOUT: Duration = Duration::from_millis(1000 * 6); const TIMEOUT_CHANNEL_CLOSING: Duration = Duration::from_millis(1000 * 8); const TIMEOUT_PONG_WAIT: Duration = Duration::from_millis(1000 * 10); const READ_CHANNEL_VALUE_STATUS_EMIT_QUIET_MIN: Duration = Duration::from_millis(1000 * 120); +const SILENCE_READ_NEXT_IVL: Duration = Duration::from_millis(1000 * 200); +const POLL_READ_TIMEOUT: Duration = Duration::from_millis(1000 * 10); const DO_RATE_CHECK: bool = false; #[allow(unused)] @@ -385,15 +387,37 @@ struct PollingState { tick: PollTickState, } +#[derive(Debug, Clone)] +struct PollTickStateIdle { + next: Instant, +} + +impl PollTickStateIdle { + fn decide_next(next_backup: Instant, ivl: Duration, tsnow: Instant) -> Instant { + let next = next_backup + ivl; + if next <= tsnow { + let mut next = next; + while next <= tsnow { + next += ivl; + } + next + } else { + next + } + } +} + +#[derive(Debug, Clone)] +struct PollTickStateWait { + next_backup: Instant, + since: Instant, + ioid: Ioid, +} + #[derive(Debug, Clone)] enum PollTickState { - // TODO use inner struct to give this Instant a name. - // When monitoring, update this ts on received events. - // It should hold the Instant when we entered this state, but a receive of some event - // is considered re-entering this state. - Idle(Instant), - // TODO use inner struct to give this Instant a name - Wait(Instant, Ioid), + Idle(PollTickStateIdle), + Wait(PollTickStateWait), } #[derive(Debug)] @@ -971,8 +995,9 @@ pub enum EndOfStreamReason { } pub struct CaConnOpts { - insert_queue_max: usize, - array_truncate: usize, + // TODO make private when we don't share it anymore + pub(super) insert_queue_max: usize, + pub(super) array_truncate: usize, } impl CaConnOpts { @@ -1037,6 +1062,7 @@ pub struct CaConn { ioid: u32, read_ioids: HashMap, handler_by_ioid: HashMap>>>, + trace_channel_poll: bool, } impl Drop for CaConn { @@ -1101,31 +1127,33 @@ impl CaConn { ioid: 100, read_ioids: HashMap::new(), handler_by_ioid: HashMap::new(), + trace_channel_poll: false, } } fn ioc_ping_ivl_rng(rng: &mut Xoshiro128PlusPlus) -> Duration { let b = IOC_PING_IVL; - b + b / 128 * (rng.next_u32() & 0x1f) + b + b * (rng.next_u32() & 0x3f) / 0xff } fn channel_status_emit_ivl(rng: &mut Xoshiro128PlusPlus) -> Duration { let b = CHANNEL_STATUS_EMIT_IVL; - b + b / 128 * (rng.next_u32() & 0x1f) + b + b * (rng.next_u32() & 0x3f) / 0xff } fn silence_read_next_ivl_rng(rng: &mut Xoshiro128PlusPlus) -> Duration { - Duration::from_millis(1000 * 300 + (rng.next_u32() & 0x3fff) as u64) + let b = SILENCE_READ_NEXT_IVL; + b + b * (rng.next_u32() & 0x7f) / 0xff } fn recv_value_status_emit_ivl_rng(rng: &mut Xoshiro128PlusPlus) -> Duration { let b = READ_CHANNEL_VALUE_STATUS_EMIT_QUIET_MIN; - b + b / 128 * (rng.next_u32() & 0x1f) + b + b * (rng.next_u32() & 0x3f) / 0xff } fn new_self_ticker(rng: &mut Xoshiro128PlusPlus) -> Pin> { - let b = Duration::from_millis(1500); - let dur = b + b / 128 * (rng.next_u32() & 0x1f); + let b = Duration::from_millis(110); + let dur = b + b * (rng.next_u32() & 0x3f) / 0xff; Box::pin(tokio::time::sleep(dur)) } @@ -1299,6 +1327,7 @@ impl CaConn { scalar_type, shape, ch.conf.min_quiets(), + ch.conf.is_polled(), &|| CaWriterValueState::new(st.series_status, chinfo.series.to_series()), )?; self.handle_writer_establish_inner(cid, writer)?; @@ -1366,6 +1395,11 @@ impl CaConn { conf.wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?; } if let Some((ivl,)) = conf_poll_conf { + let ivl = Duration::from_millis(ivl); + if self.trace_channel_poll { + trace!("make poll idle state from writer establish"); + } + let next = self.poll_tsnow + ivl * (self.rng.next_u32() & 0x1ff) / 511; let created_state = WritableState { tsbeg: self.poll_tsnow, channel: std::mem::replace(&mut st2.channel, CreatedState::dummy()), @@ -1373,8 +1407,8 @@ impl CaConn { binwriter, reading: ReadingState::Polling(PollingState { tsbeg: self.poll_tsnow, - poll_ivl: Duration::from_millis(ivl), - tick: PollTickState::Idle(self.poll_tsnow), + poll_ivl: ivl, + tick: PollTickState::Idle(PollTickStateIdle { next }), }), }; conf.state = ChannelState::Writable(created_state); @@ -1434,6 +1468,11 @@ impl CaConn { } pub fn channel_add(&mut self, conf: ChannelConfig, cssid: ChannelStatusSeriesId) -> Result<(), Error> { + if false { + if netpod::trigger.contains(&conf.name()) { + self.trace_channel_poll = true; + } + } if self.cid_by_name(conf.name()).is_some() { self.stats.channel_add_exists.inc(); if trigger.contains(&conf.name()) { @@ -1800,10 +1839,14 @@ impl CaConn { match ch_s { ChannelState::Writable(st) => match &mut st.reading { ReadingState::StopMonitoringForPolling(..) => { + if self.trace_channel_poll { + trace!("make poll idle from event add empty"); + } + let ivl = Duration::from_millis(1000); st.reading = ReadingState::Polling(PollingState { tsbeg: tsnow, - poll_ivl: Duration::from_millis(1000), - tick: PollTickState::Idle(tsnow), + poll_ivl: ivl, + tick: PollTickState::Idle(PollTickStateIdle { next: tsnow }), }); } ReadingState::EnableMonitoring(..) => { @@ -1882,15 +1925,19 @@ impl CaConn { } match &mut st.reading { ReadingState::Polling(st2) => match &mut st2.tick { - PollTickState::Idle(_st3) => { + PollTickState::Idle(_) => { self.stats.recv_read_notify_while_polling_idle.inc(); } - PollTickState::Wait(st3, ioid) => { - let dt = tsnow.saturating_duration_since(*st3); - self.stats.caget_lat().ingest((1e3 * dt.as_secs_f32()) as u32); + PollTickState::Wait(st3) => { + let dt = tsnow.saturating_duration_since(st3.since); + self.stats.caget_lat().ingest_dur_dms(dt); // TODO maintain histogram of read-notify latencies - self.read_ioids.remove(ioid); - st2.tick = PollTickState::Idle(tsnow); + self.read_ioids.remove(&st3.ioid); + let next = PollTickStateIdle::decide_next(st3.next_backup, st2.poll_ivl, tsnow); + if self.trace_channel_poll { + trace!("make next poll idle at {next:?} tsnow {tsnow:?}"); + } + st2.tick = PollTickState::Idle(PollTickStateIdle { next }); let iqdqs = &mut self.iqdqs; let stats = self.stats.as_ref(); Self::read_notify_res_for_write( @@ -2059,7 +2106,7 @@ impl CaConn { stats.ca_ts_off().ingest((ts_diff / MS) as u32); } { - let evts = ts_local; + let tsev = ts_local; Self::check_ev_value_data(&value.data, &writer.scalar_type())?; crst.muted_before = 0; crst.insert_item_ivl_ema.tick(tsnow); @@ -2067,7 +2114,7 @@ impl CaConn { // let ts_local = TsNano::from_ns(ts_local); // binwriter.ingest(ts_ioc, ts_local, &val, iqdqs)?; { - let wres = writer.write(CaWriterValue::new(value, crst), tsnow, evts, iqdqs)?; + let wres = writer.write(CaWriterValue::new(value, crst), tsnow, tsev, iqdqs)?; crst.status_emit_count += wres.nstatus() as u64; if wres.st.accept { crst.dw_st_last = stnow; @@ -2323,8 +2370,8 @@ impl CaConn { }, ReadingState::StopMonitoringForPolling(_) => {} ReadingState::Polling(st3) => match &mut st3.tick { - PollTickState::Idle(x) => { - if *x + st3.poll_ivl <= tsnow { + PollTickState::Idle(st4) => { + if st4.next <= tsnow { let ioid = Ioid(self.ioid); self.ioid = self.ioid.wrapping_add(1); self.read_ioids.insert(ioid, st2.channel.cid.clone()); @@ -2339,15 +2386,23 @@ impl CaConn { ); do_wake_again = true; self.proto.as_mut().unwrap().push_out(msg); - st3.tick = PollTickState::Wait(tsnow, ioid); + st3.tick = PollTickState::Wait(PollTickStateWait { + next_backup: st4.next, + since: tsnow, + ioid, + }); self.stats.caget_issued().inc(); } } - PollTickState::Wait(x, ioid) => { - if *x + Duration::from_millis(10000) <= tsnow { - self.read_ioids.remove(ioid); + PollTickState::Wait(st4) => { + if st4.since + POLL_READ_TIMEOUT <= tsnow { + self.read_ioids.remove(&st4.ioid); self.stats.caget_timeout().inc(); - st3.tick = PollTickState::Idle(tsnow); + let next = PollTickStateIdle::decide_next(st4.next_backup, st3.poll_ivl, tsnow); + if self.trace_channel_poll { + trace!("make poll idle after poll timeout {next:?}"); + } + st3.tick = PollTickState::Idle(PollTickStateIdle { next }); } } }, @@ -2490,8 +2545,7 @@ impl CaConn { // let addr = &self.remote_addr_dbg; if let Some(started) = self.ioc_ping_start { let dt = started.elapsed(); - let dt = dt.as_secs() as u32 + dt.subsec_millis(); - self.stats.pong_recv_lat().ingest(dt); + self.stats.pong_recv_lat().ingest_dur_dms(dt); } else { let addr = &self.remote_addr_dbg; warn!("received Echo even though we didn't asked for it {addr:?}"); @@ -2844,11 +2898,15 @@ impl CaConn { } fn handle_own_ticker(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> { - // debug!("tick CaConn {}", self.remote_addr_dbg); let tsnow = Instant::now(); if !self.is_shutdown() { self.ticker = Self::new_self_ticker(&mut self.rng); - let _ = self.ticker.poll_unpin(cx); + match self.ticker.poll_unpin(cx) { + Poll::Pending => {} + _ => { + return Err(Error::FutLogic); + } + } // cx.waker().wake_by_ref(); } self.check_channels_state_init(tsnow, cx)?; @@ -2932,7 +2990,6 @@ impl CaConn { count: acc.usage().count() as _, bytes: acc.usage().bytes() as _, }; - trace!("EMIT ITEM {rt:?} {item:?}"); self.iqdqs.emit_accounting_item(rt, item)?; } acc.reset(msp); @@ -3145,7 +3202,6 @@ impl Stream for CaConn { self.poll_tsnow = Instant::now(); self.tmp_ts_poll = SystemTime::now(); let poll_ts1 = Instant::now(); - self.stats.poll_count().inc(); self.stats.poll_fn_begin().inc(); let mut reloops: u32 = 0; let ret = loop { @@ -3318,7 +3374,7 @@ impl Stream for CaConn { debug!("LONG OPERATION 2 {:.0} ms", 1e3 * dt.as_secs_f32()); } let dt = lts3.saturating_duration_since(lts2); - self.stats.poll_op3_dt().ingest((1e3 * dt.as_secs_f32()) as u32); + self.stats.poll_op3_dt().ingest_dur_dms(dt); if dt > max { debug!("LONG OPERATION 3 {:.0} ms", 1e3 * dt.as_secs_f32()); } @@ -3389,7 +3445,16 @@ impl Stream for CaConn { }; let poll_ts2 = Instant::now(); let dt = poll_ts2.saturating_duration_since(poll_ts1); - self.stats.poll_all_dt().ingest((1e3 * dt.as_secs_f32()) as u32); + if self.trace_channel_poll { + self.stats.poll_all_dt().ingest_dur_dms(dt); + if dt >= Duration::from_millis(10) { + trace!("long poll {dt:?}"); + } else if dt >= Duration::from_micros(400) { + let v = self.stats.poll_all_dt.to_display(); + let ip = self.remote_addr_dbg; + trace!("poll_all_dt {ip} {v}"); + } + } self.stats.read_ioids_len().set(self.read_ioids.len() as u64); let n = match &self.proto { Some(x) => x.proto_out_len() as u64, @@ -3442,7 +3507,7 @@ impl CaWriterValue { .map_or_else(|| String::from("undefined"), String::from) }, ); - trace!("CaWriterValue convert enum {} {:?}", crst.name(), conv); + // trace!("CaWriterValue convert enum {} {:?}", crst.name(), conv); Some(conv) } _ => None, diff --git a/netfetch/src/ca/conn2/channel.rs b/netfetch/src/ca/conn2/channel.rs index c9c73c2..7937e1b 100644 --- a/netfetch/src/ca/conn2/channel.rs +++ b/netfetch/src/ca/conn2/channel.rs @@ -1,3 +1,13 @@ -trait Channel {} +use err::thiserror; +use err::ThisError; + +#[derive(Debug, ThisError)] +#[cstm(name = "ConnChannelError")] +pub enum Error {} + +trait Channel { + fn can_accept_ca_msg(&self) -> bool; + fn process_ca_msg(&mut self, msg: crate::ca::proto::CaMsg) -> Result<(), Error>; +} struct ChannelAny {} diff --git a/netfetch/src/ca/conn2/conn.rs b/netfetch/src/ca/conn2/conn.rs index 378da86..0dcde3f 100644 --- a/netfetch/src/ca/conn2/conn.rs +++ b/netfetch/src/ca/conn2/conn.rs @@ -11,21 +11,48 @@ use futures_util::Stream; use futures_util::StreamExt; use hashbrown::HashMap; use log::*; +use scywr::insertqueues::InsertDeques; use scywr::insertqueues::InsertQueuesTx; +use scywr::iteminsertqueue::QueryItem; use stats::rand_xoshiro::Xoshiro128PlusPlus; use stats::CaConnStats; use stats::CaProtoStats; +use std::collections::VecDeque; use std::net::SocketAddrV4; use std::pin::Pin; use std::sync::Arc; use std::task::Context; use std::task::Poll; +use std::time::Duration; use std::time::Instant; use taskrun::tokio; use tokio::net::TcpStream; +struct DurationMeasureSteps { + ts: Instant, + durs: smallvec::SmallVec<[Duration; 8]>, +} + +impl DurationMeasureSteps { + fn new() -> Self { + Self { + ts: Instant::now(), + durs: smallvec::SmallVec::new(), + } + } + + fn step(&mut self) { + let ts = Instant::now(); + let d = ts.saturating_duration_since(self.ts); + self.durs.push(d); + self.ts = ts; + } +} + #[derive(Debug)] -pub enum Error {} +pub enum Error { + TickerPoll, +} type ConnectingFut = Pin, tokio::time::error::Elapsed>> + Send>>; @@ -47,7 +74,11 @@ struct CaConn { opts: CaConnOpts, backend: String, state: CaConnState, + iqdqs: InsertDeques, + ca_conn_event_out_queue: VecDeque, + ca_conn_event_out_queue_max: usize, rng: Xoshiro128PlusPlus, + stats: Arc, } impl CaConn { @@ -68,9 +99,24 @@ impl CaConn { opts, backend, state: CaConnState::Connecting(tsnow, remote_addr, err::todoval()), + iqdqs: InsertDeques::new(), + ca_conn_event_out_queue: VecDeque::new(), + ca_conn_event_out_queue_max: 2000, rng, + stats, } } + + fn poll_own_ticker(mut self: Pin<&mut Self>, cx: &mut Context) -> Result, Error> { + // TODO nothing taken yet + todo!() + } + + // call this only from the main fn poll + fn shutdown_on_error(&mut self, e: Error) { + self.state = CaConnState::Shutdown(EndOfStreamReason::Error(e)); + todo!() + } } impl Stream for CaConn { @@ -78,6 +124,241 @@ impl Stream for CaConn { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - todo!() + let mut durs = DurationMeasureSteps::new(); + self.stats.poll_fn_begin().inc(); + let ret = loop { + self.stats.poll_loop_begin().inc(); + + let qlen = self.iqdqs.len(); + if qlen >= self.opts.insert_queue_max * 2 / 3 { + self.stats.insert_item_queue_pressure().inc(); + } else if qlen >= self.opts.insert_queue_max { + self.stats.insert_item_queue_full().inc(); + } + + let mut have_pending = false; + let mut have_progress = false; + + if let CaConnState::Done = self.state { + break Ready(None); + } else if let Some(item) = self.ca_conn_event_out_queue.pop_front() { + break Ready(Some(item)); + } + + // TODO add up duration of this scope + match self.as_mut().poll_own_ticker(cx) { + Ok(Ready(())) => { + have_progress = true; + } + Ok(Pending) => { + have_pending = true; + } + Err(e) => { + self.shutdown_on_error(e); + continue; + } + } + + { + let n = self.iqdqs.len(); + self.stats.iiq_len().ingest(n as u32); + } + + { + // let stats2 = self.stats.clone(); + // let stats_fn = move |item: &VecDeque| { + // stats2.iiq_batch_len().ingest(item.len() as u32); + // }; + // flush_queue_dqs!( + // self, + // st_rf1_qu, + // st_rf1_sp_pin, + // send_batched::<256, _>, + // 32, + // (&mut have_progress, &mut have_pending), + // "st_rf1_rx", + // cx, + // stats_fn + // ); + + // let stats2 = self.stats.clone(); + // let stats_fn = move |item: &VecDeque| { + // stats2.iiq_batch_len().ingest(item.len() as u32); + // }; + // flush_queue_dqs!( + // self, + // st_rf3_qu, + // st_rf3_sp_pin, + // send_batched::<256, _>, + // 32, + // (&mut have_progress, &mut have_pending), + // "st_rf3_rx", + // cx, + // stats_fn + // ); + + // let stats2 = self.stats.clone(); + // let stats_fn = move |item: &VecDeque| { + // stats2.iiq_batch_len().ingest(item.len() as u32); + // }; + // flush_queue_dqs!( + // self, + // mt_rf3_qu, + // mt_rf3_sp_pin, + // send_batched::<256, _>, + // 32, + // (&mut have_progress, &mut have_pending), + // "mt_rf3_rx", + // cx, + // stats_fn + // ); + + // let stats2 = self.stats.clone(); + // let stats_fn = move |item: &VecDeque| { + // stats2.iiq_batch_len().ingest(item.len() as u32); + // }; + // flush_queue_dqs!( + // self, + // lt_rf3_qu, + // lt_rf3_sp_pin, + // send_batched::<256, _>, + // 32, + // (&mut have_progress, &mut have_pending), + // "lt_rf3_rx", + // cx, + // stats_fn + // ); + } + + // if !self.is_shutdown() { + // flush_queue!( + // self, + // channel_info_query_qu, + // channel_info_query_tx, + // send_individual, + // 32, + // (&mut have_progress, &mut have_pending), + // "chinf", + // cx, + // |_| {} + // ); + // } + + // match self.as_mut().handle_writer_establish_result(cx) { + // Ok(Ready(Some(()))) => { + // have_progress = true; + // } + // Ok(Ready(None)) => {} + // Ok(Pending) => { + // have_pending = true; + // } + // Err(e) => break Ready(Some(CaConnEvent::err_now(e))), + // } + + // match self.as_mut().handle_conn_command(cx) { + // Ok(Ready(Some(()))) => { + // have_progress = true; + // } + // Ok(Ready(None)) => {} + // Ok(Pending) => { + // have_pending = true; + // } + // Err(e) => break Ready(Some(CaConnEvent::err_now(e))), + // } + + // match self.loop_inner(cx) { + // Ok(Ready(Some(()))) => { + // have_progress = true; + // } + // Ok(Ready(None)) => {} + // Ok(Pending) => { + // have_pending = true; + // } + // Err(e) => { + // error!("{e}"); + // self.state = CaConnState::EndOfStream; + // break Ready(Some(CaConnEvent::err_now(e))); + // } + // } + + break match self.state { + CaConnState::Connecting(_, _, _) => todo!(), + CaConnState::Connected(_) => todo!(), + CaConnState::Shutdown(_) => { + // TODO still attempt to flush queues. + // If all queues are flushed, go into Done state. + todo!() + } + CaConnState::Done => todo!(), + }; + + // break if self.is_shutdown() { + // if self.queues_out_flushed() { + // debug!("is_shutdown queues_out_flushed set EOS {}", self.remote_addr_dbg); + // if let CaConnState::Shutdown(x) = std::mem::replace(&mut self.state, CaConnState::EndOfStream) { + // Ready(Some(CaConnEvent::new_now(CaConnEventValue::EndOfStream(x)))) + // } else { + // continue; + // } + // } else { + // if have_progress { + // debug!("is_shutdown NOT queues_out_flushed prog {}", self.remote_addr_dbg); + // self.stats.poll_reloop().inc(); + // reloops += 1; + // continue; + // } else if have_pending { + // debug!("is_shutdown NOT queues_out_flushed pend {}", self.remote_addr_dbg); + // self.log_queues_summary(); + // self.stats.poll_pending().inc(); + // Pending + // } else { + // // TODO error + // error!("shutting down, queues not flushed, no progress, no pending"); + // self.stats.logic_error().inc(); + // let e = Error::ShutdownWithQueuesNoProgressNoPending; + // Ready(Some(CaConnEvent::err_now(e))) + // } + // } + // } else { + // if have_progress { + // if poll_ts1.elapsed() > Duration::from_millis(5) { + // self.stats.poll_wake_break().inc(); + // cx.waker().wake_by_ref(); + // break Ready(Some(CaConnEvent::new(self.poll_tsnow, CaConnEventValue::None))); + // } else { + // self.stats.poll_reloop().inc(); + // reloops += 1; + // continue; + // } + // } else if have_pending { + // self.stats.poll_pending().inc(); + // Pending + // } else { + // self.stats.poll_no_progress_no_pending().inc(); + // let e = Error::NoProgressNoPending; + // Ready(Some(CaConnEvent::err_now(e))) + // } + // }; + }; + + durs.step(); + // if self.trace_channel_poll { + // self.stats.poll_all_dt().ingest_dur_dms(dt); + // if dt >= Duration::from_millis(10) { + // trace!("long poll {dt:?}"); + // } else if dt >= Duration::from_micros(400) { + // let v = self.stats.poll_all_dt.to_display(); + // let ip = self.remote_addr_dbg; + // trace!("poll_all_dt {ip} {v}"); + // } + // } + // self.stats.read_ioids_len().set(self.read_ioids.len() as u64); + // let n = match &self.proto { + // Some(x) => x.proto_out_len() as u64, + // None => 0, + // }; + // self.stats.proto_out_len().set(n); + // self.stats.poll_reloops().ingest(reloops); + ret } } diff --git a/serieswriter/src/ratelimitwriter.rs b/serieswriter/src/ratelimitwriter.rs index 510b888..7c16c4c 100644 --- a/serieswriter/src/ratelimitwriter.rs +++ b/serieswriter/src/ratelimitwriter.rs @@ -41,6 +41,7 @@ where { series: SeriesId, min_quiet: Duration, + is_polled: bool, emit_state: ::State, last_insert_ts: TsNano, last_insert_val: Option, @@ -57,6 +58,7 @@ where pub fn new( series: SeriesId, min_quiet: Duration, + is_polled: bool, emit_state: ::State, dbgname: String, ) -> Result { @@ -64,6 +66,7 @@ where let ret = Self { series, min_quiet, + is_polled, emit_state, last_insert_ts: TsNano::from_ns(0), last_insert_val: None, @@ -113,9 +116,15 @@ where "{dbgname} {sid} ignore, because ts_local rewind {ts:?} {tsl:?}", ); false - } else if ts.ms() < tsl.ms() + min_quiet { + } else if !self.is_polled && ts.ms() < tsl.ms() + min_quiet { trace_rt_decision!(det, "{dbgname} {sid} ignore, because not min quiet {ts:?} {tsl:?}"); false + } else if self.is_polled && ts.ms() + 800 < tsl.ms() + min_quiet { + trace_rt_decision!( + det, + "{dbgname} {sid} ignore, because not is-polled min quiet {ts:?} {tsl:?}" + ); + false } else if ts < tsl.add_dt_nano(DtNano::from_ms(5)) { trace_rt_decision!(det, "{dbgname} {sid} ignore, because store rate cap"); false diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index e10f85a..bd36fc3 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -89,19 +89,20 @@ where scalar_type: ScalarType, shape: Shape, min_quiets: MinQuiets, + is_polled: bool, emit_state_new: &dyn Fn() -> ::State, ) -> Result { let state_st = { // let writer = SeriesWriter::establish_with_sid(sid, stnow)?; - let writer = RateLimitWriter::new(series, min_quiets.st, emit_state_new(), "st".into())?; + let writer = RateLimitWriter::new(series, min_quiets.st, is_polled, emit_state_new(), "st".into())?; State { writer } }; let state_mt = { - let writer = RateLimitWriter::new(series, min_quiets.mt, emit_state_new(), "mt".into())?; + let writer = RateLimitWriter::new(series, min_quiets.mt, is_polled, emit_state_new(), "mt".into())?; State { writer } }; let state_lt = { - let writer = RateLimitWriter::new(series, min_quiets.lt, emit_state_new(), "lt".into())?; + let writer = RateLimitWriter::new(series, min_quiets.lt, is_polled, emit_state_new(), "lt".into())?; State { writer } }; let ret = Self { diff --git a/stats/src/stats.rs b/stats/src/stats.rs index d5dd060..c6250e2 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -309,7 +309,6 @@ stats_proc::stats_struct!(( channel_info_insert_done, ivl_insert_done, mute_insert_done, - poll_count, loop1_count, loop2_count, loop3_count, diff --git a/stats_proc/src/stats_proc.rs b/stats_proc/src/stats_proc.rs index 2467281..5418947 100644 --- a/stats_proc/src/stats_proc.rs +++ b/stats_proc/src/stats_proc.rs @@ -53,7 +53,7 @@ fn stats_struct_impl(st: &StatsStructDef) -> String { let init_histolog2s = st .histolog2s .iter() - .map(|x| format!("{:12}{}: stats_types::HistoLog2::new(0)", "", x.to_string())); + .map(|x| format!("{:12}{}: stats_types::HistoLog2::new()", "", x.to_string())); let inits: Vec<_> = inits1.into_iter().chain(inits2).chain(init_histolog2s).collect(); let inits = inits.join(",\n"); let incers: String = st diff --git a/stats_types/src/stats_types.rs b/stats_types/src/stats_types.rs index ed444f3..b36b903 100644 --- a/stats_types/src/stats_types.rs +++ b/stats_types/src/stats_types.rs @@ -1,9 +1,11 @@ pub use serde_json; +use core::fmt; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering::AcqRel; use std::sync::atomic::Ordering::Acquire; use std::sync::atomic::Ordering::Release; +use std::time::Duration; #[derive(Debug)] pub struct CounterDesc { @@ -123,7 +125,6 @@ impl StatsAReader {} pub struct HistoLog2 { histo: [AtomicU64; 20], sum: AtomicU64, - sub: u16, } macro_rules! rep16 { @@ -135,24 +136,25 @@ macro_rules! rep16 { } impl HistoLog2 { - pub fn new(sub: u16) -> Self { + pub fn new() -> Self { Self { histo: rep16!([AtomicU64::new(0)]), sum: AtomicU64::new(0), - sub, } } #[inline] - pub fn ingest(&self, mut v: u32) { + pub fn ingest(&self, v: u32) { self.sum.fetch_add(v as u64, AcqRel); - v >>= self.sub; - let mut po = 0; - while v != 0 && po < self.histo.len() - 1 { - v >>= 1; - po += 1; - } - self.histo[po].fetch_add(1, AcqRel); + let w = 32 - v.leading_zeros(); + let i = w.min(15); + self.histo[i as usize].fetch_add(1, AcqRel); + } + + #[inline] + pub fn ingest_dur_dms(&self, dt: Duration) { + let v = 10000 * dt.as_secs() as u32 + dt.subsec_micros() / 100; + self.ingest(v) } pub fn to_prometheus(&self, name: &str) -> String { @@ -196,14 +198,30 @@ impl HistoLog2 { ret } - pub fn to_json(&self, _name: &str) -> serde_json::Value { + pub fn to_json(&self, name: &str) -> serde_json::Value { + let _ = name; serde_json::Value::Null } + + pub fn to_display(&self) -> HistoLog2Display { + HistoLog2Display { inner: self } + } +} + +pub struct HistoLog2Display<'a> { + inner: &'a HistoLog2, +} + +impl<'a> fmt::Display for HistoLog2Display<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let histo: Vec<_> = self.inner.histo.iter().map(|x| x.load(Acquire)).collect(); + write!(fmt, "HistoLog2 {{ histo: {:?}, sum: {:?} }}", histo, self.inner.sum) + } } #[test] fn histo_00() { - let histo = HistoLog2::new(0); + let histo = HistoLog2::new(); // histo.ingest(0); // histo.ingest(1); // histo.ingest(2);