By default more tasks for mt, lt write, WIP on channel inspect

This commit is contained in:
Dominik Werder
2025-03-26 09:08:14 +01:00
parent abca73836d
commit f1286d9ba6
26 changed files with 449 additions and 160 deletions
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.2.7-aa.7"
version = "0.2.7-aa.10"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2024"
+77 -7
View File
@@ -45,7 +45,8 @@ const CHECK_CHANNEL_SLOW_WARN: Duration = Duration::from_millis(500);
pub struct DaemonOpts {
pgconf: Database,
scyconf_st: ScyllaIngestConfig,
scyconf_st_rf3: ScyllaIngestConfig,
scyconf_st_rf1: ScyllaIngestConfig,
scyconf_mt: ScyllaIngestConfig,
scyconf_lt: ScyllaIngestConfig,
#[allow(unused)]
@@ -201,6 +202,16 @@ impl Daemon {
let mut insert_worker_jhs = Vec::new();
if ingest_opts.scylla_disable() {
let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy(
ingest_opts.insert_worker_count(),
ingest_opts.insert_worker_concurrency(),
iqrx.st_rf1_rx,
insert_worker_opts.clone(),
insert_worker_stats.clone(),
)
.await
.map_err(Error::from_string)?;
insert_worker_jhs.extend(jh);
let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy(
ingest_opts.insert_worker_count(),
ingest_opts.insert_worker_concurrency(),
@@ -231,10 +242,36 @@ impl Daemon {
.await
.map_err(Error::from_string)?;
insert_worker_jhs.extend(jh);
let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy(
ingest_opts.insert_worker_count(),
ingest_opts.insert_worker_concurrency(),
iqrx.lt_rf3_lat5_rx,
insert_worker_opts.clone(),
insert_worker_stats.clone(),
)
.await
.map_err(Error::from_string)?;
insert_worker_jhs.extend(jh);
} else {
let jh = scywr::insertworker::spawn_scylla_insert_workers(
RetentionTime::Short,
opts.scyconf_st.clone(),
opts.scyconf_st_rf1.clone(),
ingest_opts.insert_scylla_sessions(),
ingest_opts.insert_worker_count(),
ingest_opts.insert_worker_concurrency(),
iqrx.st_rf1_rx,
insert_worker_opts.clone(),
insert_worker_stats.clone(),
ingest_opts.use_rate_limit_queue(),
ignore_writes,
)
.await
.map_err(Error::from_string)?;
insert_worker_jhs.extend(jh);
let jh = scywr::insertworker::spawn_scylla_insert_workers(
RetentionTime::Short,
opts.scyconf_st_rf3.clone(),
ingest_opts.insert_scylla_sessions(),
ingest_opts.insert_worker_count(),
ingest_opts.insert_worker_concurrency(),
@@ -252,8 +289,8 @@ impl Daemon {
RetentionTime::Medium,
opts.scyconf_mt.clone(),
ingest_opts.insert_scylla_sessions(),
ingest_opts.insert_worker_count().min(2),
ingest_opts.insert_worker_concurrency().min(8),
ingest_opts.insert_worker_count(),
ingest_opts.insert_worker_concurrency(),
iqrx.mt_rf3_rx,
insert_worker_opts.clone(),
insert_worker_stats.clone(),
@@ -270,8 +307,8 @@ impl Daemon {
RetentionTime::Long,
opts.scyconf_lt.clone(),
ingest_opts.insert_scylla_sessions(),
ingest_opts.insert_worker_count().min(2),
ingest_opts.insert_worker_concurrency().min(8),
ingest_opts.insert_worker_count(),
ingest_opts.insert_worker_concurrency(),
lt_rx_combined,
insert_worker_opts.clone(),
insert_worker_stats.clone(),
@@ -422,6 +459,23 @@ impl Daemon {
self.insert_queue_counter.load(atomic::Ordering::Acquire),
);
}
let iqtxm = self
.iqtx
.as_ref()
.map(|x| netfetch::metrics::types::InsertQueuesTxMetrics::from(x));
if let Some(iqtxm) = iqtxm {
self.stats().iqtx_len_st_rf1().set(iqtxm.st_rf1_len as _);
self.stats().iqtx_len_st_rf3().set(iqtxm.st_rf3_len as _);
self.stats().iqtx_len_mt_rf3().set(iqtxm.mt_rf3_len as _);
self.stats().iqtx_len_lt_rf3().set(iqtxm.lt_rf3_len as _);
self.stats().iqtx_len_lt_rf3_lat5().set(iqtxm.lt_rf3_lat5_len as _);
} else {
self.stats().iqtx_len_st_rf1().set(2);
self.stats().iqtx_len_st_rf3().set(2);
self.stats().iqtx_len_mt_rf3().set(2);
self.stats().iqtx_len_lt_rf3().set(2);
self.stats().iqtx_len_lt_rf3_lat5().set(2);
}
Ok(())
}
@@ -439,6 +493,11 @@ impl Daemon {
Ok(())
}
async fn handle_channel_command(&mut self, cmd: netfetch::ca::connset::ChannelCommand) -> Result<(), Error> {
self.connset_ctrl.send_channel_command(cmd).await?;
Ok(())
}
#[cfg(target_abi = "x32")]
async fn handle_ca_conn_done(&mut self, conn_addr: SocketAddrV4) -> Result<(), Error> {
info!("handle_ca_conn_done {conn_addr:?}");
@@ -510,6 +569,7 @@ impl Daemon {
if self.shutting_down {
warn!("already shutting down");
} else {
info!("handle_shutdown");
self.shutting_down = true;
// TODO make sure we:
// set a flag so that we don't attempt to use resources any longer (why could that happen?)
@@ -518,6 +578,13 @@ impl Daemon {
// drop our ends of channels to workers (gate them behind option?).
// await the connection sets.
// await other workers that we've spawned.
if let Some(iqtx) = &self.iqtx {
info!("scylla output channels, closing all");
iqtx.close_all();
} else {
info!("scylla output channels, not set");
}
drop(self.iqtx.take());
self.connset_ctrl.shutdown().await?;
self.rx.close();
}
@@ -624,6 +691,7 @@ impl Daemon {
}
ChannelAdd(ch, tx) => self.handle_channel_add(ch, tx).await,
ChannelRemove(ch) => self.handle_channel_remove(ch).await,
ChannelCommand(cmd) => self.handle_channel_command(cmd).await,
CaConnSetItem(item) => self.handle_ca_conn_set_item(item).await,
Shutdown => self.handle_shutdown().await,
ConfigReload(tx) => self.handle_config_reload(tx).await,
@@ -683,6 +751,7 @@ impl Daemon {
self.channel_info_query_tx.clone(),
self.series_conf_by_id_tx.clone(),
self.iqtx
.clone()
.take()
.ok_or_else(|| Error::with_msg_no_trace("no iqtx available"))?,
self.ingest_opts.scylla_config_st().clone(),
@@ -833,7 +902,8 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option<ChannelsConfig>) ->
let opts2 = DaemonOpts {
pgconf: opts.postgresql_config().clone(),
scyconf_st: opts.scylla_config_st().clone(),
scyconf_st_rf3: opts.scylla_config_st().clone(),
scyconf_st_rf1: opts.scylla_config_st_rf1().clone(),
scyconf_mt: opts.scylla_config_mt().clone(),
scyconf_lt: opts.scylla_config_lt().clone(),
test_bsread_addr: opts.test_bsread_addr.clone(),
-2
View File
@@ -8,8 +8,6 @@ use netpod::Shape;
use netpod::TsNano;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::ScalarValue;
use serieswriter::writer::SeriesWriter;
use std::collections::VecDeque;
use std::io::Cursor;
use std::net::Ipv4Addr;
use std::time::SystemTime;
+127 -39
View File
@@ -2,6 +2,7 @@ mod enumfetch;
use crate::conf::ChannelConfig;
use crate::metrics::status::StorageUsage;
use crate::metrics::types::CaConnMetrics;
use crate::throttletrace::ThrottleTrace;
use async_channel::Receiver;
use async_channel::Sender;
@@ -51,6 +52,7 @@ use scywriiq::MspItem;
use scywriiq::QueryItem;
use scywriiq::ShutdownReason;
use serde::Serialize;
use serde_helper::serde_instant::serde_Instant_elapsed_ms;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use serieswriter::binwriter::BinWriter;
@@ -230,7 +232,7 @@ mod ser_instant {
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
pub struct Cid(pub u32);
impl fmt::Display for Cid {
@@ -239,7 +241,7 @@ impl fmt::Display for Cid {
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
struct Subid(pub u32);
impl Subid {
@@ -248,7 +250,7 @@ impl Subid {
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
struct Sid(pub u32);
impl Sid {
@@ -257,57 +259,71 @@ impl Sid {
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
struct Ioid(pub u32);
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize)]
enum ChannelError {
CreateChanFail(ChannelStatusSeriesId),
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
struct CreatingState {
#[serde(with = "serde_Instant_elapsed_ms")]
tsbeg: Instant,
cssid: ChannelStatusSeriesId,
cid: Cid,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
struct MakingSeriesWriterState {
#[serde(with = "serde_Instant_elapsed_ms")]
tsbeg: Instant,
channel: CreatedState,
series_status: SeriesId,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
struct FetchEnumDetails {
#[serde(with = "serde_Instant_elapsed_ms")]
tsbeg: Instant,
cssid: ChannelStatusSeriesId,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
struct EnableMonitoringState {
#[serde(with = "serde_Instant_elapsed_ms")]
tsbeg: Instant,
subid: Subid,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
struct ReadPendingState {
#[serde(with = "serde_Instant_elapsed_ms")]
tsbeg: Instant,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
struct Monitoring2PassiveState {
// Holds instant when we entered this state. A receive of an event is considered a re-enter of the state,
// so the instant gets updated. Used for timeout check.
#[serde(with = "serde_Instant_elapsed_ms")]
tsbeg: Instant,
#[serde(with = "serde_Instant_elapsed_ms")]
ts_silence_read_next: Instant,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
struct Monitoring2ReadPendingState {
#[serde(with = "serde_Instant_elapsed_ms")]
tsbeg: Instant,
ioid: Ioid,
}
#[derive(Debug, Clone, Serialize)]
enum Monitoring2State {
Passive(Monitoring2PassiveState),
ReadPending(Ioid, Instant),
ReadPending(Monitoring2ReadPendingState),
}
#[derive(Debug, Clone, Serialize)]
enum MonitorReadCmp {
@@ -317,8 +333,9 @@ enum MonitorReadCmp {
DiffValue,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
struct MonitoringState {
#[serde(with = "serde_Instant_elapsed_ms")]
tsbeg: Instant,
subid: Subid,
mon2state: Monitoring2State,
@@ -326,20 +343,23 @@ struct MonitoringState {
last_comparisons: VecDeque<(time::UtcDateTime, MonitorReadCmp)>,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
struct StopMonitoringForPollingState {
#[serde(with = "serde_Instant_elapsed_ms")]
tsbeg: Instant,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
struct PollingState {
#[serde(with = "serde_Instant_elapsed_ms")]
tsbeg: Instant,
poll_ivl: Duration,
tick: PollTickState,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
struct PollTickStateIdle {
#[serde(with = "serde_Instant_elapsed_ms")]
next: Instant,
}
@@ -358,21 +378,24 @@ impl PollTickStateIdle {
}
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
struct PollTickStateWait {
#[serde(with = "serde_Instant_elapsed_ms")]
next_backup: Instant,
#[serde(with = "serde_Instant_elapsed_ms")]
since: Instant,
ioid: Ioid,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
enum PollTickState {
Idle(PollTickStateIdle),
Wait(PollTickStateWait),
}
#[derive(Debug)]
#[derive(Debug, Serialize)]
struct WritableState {
#[serde(with = "serde_Instant_elapsed_ms")]
tsbeg: Instant,
channel: CreatedState,
writer: CaRtWriter,
@@ -380,7 +403,7 @@ struct WritableState {
reading: ReadingState,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
enum ReadingState {
EnableMonitoring(EnableMonitoringState),
Monitoring(MonitoringState),
@@ -388,7 +411,7 @@ enum ReadingState {
Polling(PollingState),
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
struct AccountingInfo {
usage: StorageUsage,
beg: TsMs,
@@ -416,21 +439,29 @@ impl AccountingInfo {
}
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
struct CreatedState {
cssid: ChannelStatusSeriesId,
cid: Cid,
sid: Sid,
ca_dbr_type: u16,
ca_dbr_count: u32,
#[serde(with = "serde_Instant_elapsed_ms")]
ts_created: Instant,
// Updated when we receive something via monitoring or polling
#[serde(with = "serde_Instant_elapsed_ms")]
ts_alive_last: Instant,
// Updated on monitoring, polling or when the channel config changes to reset the timeout
#[serde(with = "serde_Instant_elapsed_ms")]
ts_activity_last: Instant,
st_activity_last: SystemTime,
// TODO
#[serde(skip)]
insert_item_ivl_ema: IntervalEma,
// TODO
#[serde(skip)]
item_recv_ivl_ema: IntervalEma,
#[serde(with = "serde_Instant_elapsed_ms")]
insert_recv_ivl_last: Instant,
muted_before: u32,
recv_count: u64,
@@ -450,6 +481,7 @@ struct CreatedState {
name: String,
enum_str_table: Option<Vec<String>>,
status_emit_count: u64,
#[serde(with = "serde_Instant_elapsed_ms")]
ts_recv_value_status_emit_next: Instant,
}
@@ -498,7 +530,7 @@ impl CreatedState {
}
}
#[derive(Debug)]
#[derive(Debug, Serialize)]
enum ChannelState {
Init(ChannelStatusSeriesId),
Creating(CreatingState),
@@ -511,16 +543,19 @@ enum ChannelState {
Ended(ChannelStatusSeriesId),
}
#[derive(Debug)]
#[derive(Debug, Serialize)]
struct ClosingState {
#[serde(with = "serde_Instant_elapsed_ms")]
tsbeg: Instant,
cssid: ChannelStatusSeriesId,
}
#[derive(Debug)]
#[derive(Debug, Serialize)]
struct ChannelConf {
conf: ChannelConfig,
state: ChannelState,
// TODO
#[serde(skip)]
wrst: WriterStatus,
}
@@ -842,11 +877,18 @@ fn info_store_msp_from_time(ts: SystemTime) -> u32 {
pub type CmdResTx = Sender<Result<(), Error>>;
#[derive(Debug)]
pub struct CmdChannelInspectFull {
name: String,
tx: Sender<serde_json::Value>,
}
#[derive(Debug)]
pub enum ConnCommandKind {
ChannelAdd(ChannelConfig, ChannelStatusSeriesId),
ChannelClose(String),
Shutdown,
ChannelInspectFull(CmdChannelInspectFull),
}
#[derive(Debug)]
@@ -965,6 +1007,7 @@ pub enum CaConnEventValue {
ChannelCreateFail(String),
EndOfStream(EndOfStreamReason),
ChannelRemoved(String),
Metrics(CaConnMetrics),
}
impl CaConnEventValue {
@@ -977,6 +1020,7 @@ impl CaConnEventValue {
CaConnEventValue::ChannelCreateFail(_) => "ChannelCreateFail",
CaConnEventValue::EndOfStream(_) => "EndOfStream",
CaConnEventValue::ChannelRemoved(_) => "ChannelRemoved",
CaConnEventValue::Metrics(_) => "Metrics",
}
}
}
@@ -1279,6 +1323,34 @@ impl CaConn {
self.cmd_shutdown();
Ok(Ready(Some(())))
}
ConnCommandKind::ChannelInspectFull(cmd) => match self.cid_by_name(&cmd.name) {
Some(cid) => match self.channels.get(&cid) {
Some(ch) => match serde_json::to_value(&ch) {
Ok(val) => match cmd.tx.send_blocking(val) {
Ok(()) => {
// all fine
Ok(Ready(Some(())))
}
Err(_) => {
// TODO count in metrics
Ok(Ready(Some(())))
}
},
Err(_) => {
// TODO count in metrics
Ok(Ready(Some(())))
}
},
None => {
// TODO count in metrics
Ok(Ready(Some(())))
}
},
None => {
// cmd.tx.close();
Ok(Ready(Some(())))
}
},
}
}
Ready(None) => {
@@ -1333,6 +1405,9 @@ impl CaConn {
ChannelState::MakingSeriesWriter(st) => {
let scalar_type = st.channel.scalar_type.clone();
let shape = st.channel.shape.clone();
if series::dbg::dbg_chn(st.channel.name()) {
info!("call RtWriter::new {:?} {:?}", chinfo, ch.conf);
}
let writer = RtWriter::new(
chinfo.series.to_series(),
scalar_type,
@@ -1639,7 +1714,9 @@ impl CaConn {
}
for (_cid, conf) in &mut self.channels {
if series::dbg::dbg_chn(conf.conf.name()) {
info!("channel_state_on_shutdown {:?}", conf);
let js = serde_json::to_string(conf).unwrap();
info!("channel_state_on_shutdown debug {:?}", conf);
info!("channel_state_on_shutdown json {}", js);
}
let chst = &mut conf.state;
match chst {
@@ -1770,7 +1847,7 @@ impl CaConn {
match x.mon2state {
// actually, no differing behavior needed so far.
Monitoring2State::Passive(_) => {}
Monitoring2State::ReadPending(_, _) => {}
Monitoring2State::ReadPending(_) => {}
}
Some(x.subid.clone())
}
@@ -1831,7 +1908,7 @@ impl CaConn {
Monitoring2State::Passive(st3) => {
st3.tsbeg = tsnow;
}
Monitoring2State::ReadPending(_ioid, _since) => {
Monitoring2State::ReadPending(_) => {
// Received EventAdd while still waiting for answer to explicit ReadNotify.
// This is fine.
self.stats.recv_event_add_while_wait_on_read_notify.inc();
@@ -1939,7 +2016,7 @@ impl CaConn {
Monitoring2State::Passive(st3) => {
st3.tsbeg = tsnow;
}
Monitoring2State::ReadPending(_, _) => {}
Monitoring2State::ReadPending(_) => {}
}
let name = self.name_by_cid(cid);
warn!("received event-cancel but channel {name:?} in wrong state");
@@ -2045,10 +2122,10 @@ impl CaConn {
}
st3.tsbeg = tsnow;
}
Monitoring2State::ReadPending(ioid2, _since) => {
Monitoring2State::ReadPending(st3) => {
// We don't check again for `since` here. That's done in timeout checking.
// So we could be here a little beyond timeout but we don't care about that.
if ioid != *ioid2 {
if ioid != st3.ioid {
// warn!("IOID mismatch ReadNotifyRes on Monitor Read Pending {ioid:?} {ioid2:?}");
self.stats.recv_read_notify_state_read_pending_bad_ioid.inc();
} else {
@@ -2243,7 +2320,7 @@ impl CaConn {
Self::check_ev_value_data(&value.data, &writer.scalar_type())?;
crst.muted_before = 0;
crst.insert_item_ivl_ema.tick(tsnow);
binwriter.ingest(tsev, value.f32_for_binning(), iqdqs)?;
// binwriter.ingest(tsev, value.f32_for_binning(), iqdqs)?;
{
let wres = writer.write(CaWriterValue::new(value, crst), tsnow, tsev, iqdqs)?;
crst.status_emit_count += wres.nstatus() as u64;
@@ -2449,7 +2526,8 @@ impl CaConn {
);
do_wake_again = true;
self.proto.as_mut().ok_or_else(|| Error::NoProtocol)?.push_out(msg);
st3.mon2state = Monitoring2State::ReadPending(ioid, tsnow);
st3.mon2state =
Monitoring2State::ReadPending(Monitoring2ReadPendingState { tsbeg: tsnow, ioid });
self.stats.caget_issued().inc();
{
let item = ChannelStatusItem {
@@ -2462,8 +2540,8 @@ impl CaConn {
}
}
}
Monitoring2State::ReadPending(ioid, since) => {
if *since + MONITOR_POLL_TIMEOUT < tsnow {
Monitoring2State::ReadPending(st4) => {
if st4.tsbeg + MONITOR_POLL_TIMEOUT < tsnow {
// Something is wrong with this channel.
// Maybe we lost connection, maybe the IOC went down, maybe there is a bug where only
// this or a subset of the subscribed channels no longer give updates.
@@ -2472,7 +2550,7 @@ impl CaConn {
trace_monitor_stale!(
"channel monitor explicit read timeout {} ioid {:?}",
name,
ioid
st4.ioid
);
{
let item = ChannelStatusItem {
@@ -2609,7 +2687,7 @@ impl CaConn {
}
ReadingState::Monitoring(st3) => match &st3.mon2state {
Monitoring2State::Passive(_st4) => {}
Monitoring2State::ReadPending(_, _) => {
Monitoring2State::ReadPending(_) => {
// This is handled in check_channels_state_poll
// TODO should unify.
}
@@ -3095,6 +3173,7 @@ impl CaConn {
CaConnState::EndOfStream => {}
}
self.iqdqs.housekeeping();
self.metrics_emit();
Ok(())
}
@@ -3106,6 +3185,14 @@ impl CaConn {
}
}
fn metrics_emit(&mut self) {
let item = CaConnMetrics {
ca_conn_event_out_queue_len: self.ca_conn_event_out_queue.len(),
};
let item = CaConnEvent::new(Instant::now(), CaConnEventValue::Metrics(item));
self.ca_conn_event_out_queue.push_back(item);
}
fn emit_channel_status(&mut self) -> Result<(), Error> {
let stnow = SystemTime::now();
let mut channel_statuses = BTreeMap::new();
@@ -3665,6 +3752,7 @@ impl Stream for CaConn {
}
}
#[derive(Debug, Serialize)]
struct CaWriterValueState {
series_data: SeriesId,
series_status: SeriesId,
@@ -3687,7 +3775,7 @@ impl CaWriterValueState {
}
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
struct CaWriterValue(CaEventValue, Option<String>);
impl CaWriterValue {
+62 -1
View File
@@ -71,6 +71,7 @@ use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use taskrun::tokio;
use tracing::Instrument;
const CHECK_CHANS_PER_TICK: usize = 10000000;
pub const SEARCH_BATCH_MAX: usize = 64;
@@ -225,6 +226,17 @@ impl fmt::Debug for ChannelStatusesRequest {
}
}
#[derive(Debug)]
pub enum ChannelCommandKind {
InspectDetail,
}
#[derive(Debug)]
pub struct ChannelCommand {
pub channel: String,
pub kind: ChannelCommandKind,
}
#[derive(Debug)]
pub enum ConnSetCmd {
ChannelConfigFlagReset(ChannelConfigFlagReset),
@@ -233,6 +245,7 @@ pub enum ConnSetCmd {
ChannelRemove(ChannelRemove),
Shutdown,
ChannelStatuses(ChannelStatusesRequest),
ChannelCommand(ChannelCommand),
}
#[derive(Debug)]
@@ -299,6 +312,13 @@ impl CaConnSetCtrl {
Ok(())
}
pub async fn send_channel_command(&self, cmd: ChannelCommand) -> Result<(), Error> {
self.tx
.send(CaConnSetEvent::ConnSetCmd(ConnSetCmd::ChannelCommand(cmd)))
.await?;
Ok(())
}
pub async fn shutdown(&self) -> Result<(), Error> {
let cmd = ConnSetCmd::Shutdown;
self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?;
@@ -582,6 +602,7 @@ impl CaConnSet {
ConnSetCmd::ChannelRemove(x) => self.handle_remove_channel(x),
ConnSetCmd::Shutdown => self.handle_shutdown(),
ConnSetCmd::ChannelStatuses(x) => self.handle_channel_statuses_req(x),
ConnSetCmd::ChannelCommand(x) => self.handle_channel_command(x),
},
}
}
@@ -721,6 +742,9 @@ impl CaConnSet {
trace3!("handle_add_channel but shutdown_stopping");
return Ok(());
}
if series::dbg::dbg_chn(cmd.name()) {
info!("handle_add_channel {:?}", cmd);
}
trace_channel_state!("handle_add_channel {:?}", cmd);
self.stats.channel_add().inc();
// TODO should I add the transition through ActiveChannelState::Init as well?
@@ -744,6 +768,10 @@ impl CaConnSet {
CaConnEventValue::ChannelStatus(st) => self.apply_ca_conn_health_update(addr, st),
CaConnEventValue::EndOfStream(reason) => self.handle_ca_conn_eos(addr, reason),
CaConnEventValue::ChannelRemoved(name) => self.handle_ca_conn_channel_removed(addr, name),
CaConnEventValue::Metrics(v) => {
// TODO aggregate metrics and stats
Ok(())
}
}
}
@@ -1047,6 +1075,24 @@ impl CaConnSet {
Ok(())
}
fn handle_channel_command(&mut self, cmd: ChannelCommand) -> Result<(), Error> {
if self.shutdown_stopping {
return Ok(());
}
// TODO handle, send to corresponding CaConn
// let channels_ca_conn_set = self
// .channel_states
// .iter()
// .filter(|(k, _)| k.name() == cmd.channel)
// .map(|(k, v)| (k.name().to_string(), v.clone()))
// .collect();
// let item = ChannelStatusesResponse { channels_ca_conn_set };
// if req.tx.try_send(item).is_err() {
// self.stats.response_tx_fail.inc();
// }
Ok(())
}
fn handle_shutdown(&mut self) -> Result<(), Error> {
if self.shutdown_stopping {
return Ok(());
@@ -1317,7 +1363,19 @@ impl CaConnSet {
let conn_tx = conn.conn_command_tx();
let conn_stats = conn.stats();
let tx1 = self.ca_conn_res_tx.as_ref().get_ref().clone();
let jh = tokio::spawn(Self::ca_conn_item_merge(conn, tx1, addr, self.stats.clone()));
let log_level = "trace";
let logspan = if log_level == "trace" {
trace!("enable trace for handler");
tracing::span!(tracing::Level::INFO, "log_span_trace")
} else if log_level == "debug" {
debug!("enable debug for handler");
tracing::span!(tracing::Level::INFO, "log_span_debug")
} else {
tracing::Span::none()
};
let fut = Self::ca_conn_item_merge(conn, tx1, addr, self.stats.clone());
let fut = fut.instrument(logspan);
let jh = tokio::spawn(fut);
let ca_conn_res = CaConnRes {
state: CaConnState::new(CaConnStateValue::Fresh),
sender: Box::pin(conn_tx.into()),
@@ -1391,6 +1449,9 @@ impl CaConnSet {
return Err(e.into());
}
}
CaConnEventValue::Metrics(_) => {
// TODO merge metrics
}
}
}
if let Some(x) = eos_reason {
+1 -2
View File
@@ -1,14 +1,13 @@
use super::connset::CaConnSetEvent;
use super::findioc::FindIocRes;
use crate::ca::connset::ConnSetCmd;
use async_channel::Receiver;
use dbpg::seriesbychannel::ChannelInfoResult;
use err::Error;
use futures_util::Stream;
use pin_project::pin_project;
use std::collections::VecDeque;
use std::pin::pin;
use std::pin::Pin;
use std::pin::pin;
use std::task::Context;
use std::task::Poll;
+2 -5
View File
@@ -1,5 +1,5 @@
use super::connset::IocAddrQuery;
use super::connset::CURRENT_SEARCH_PENDING_MAX;
use super::connset::IocAddrQuery;
use super::connset::SEARCH_BATCH_MAX;
use super::search::ca_search_workers_start;
use crate::ca::findioc::FindIocRes;
@@ -10,7 +10,6 @@ use dbpg::conn::make_pg_client;
use dbpg::iocindex::IocItem;
use dbpg::iocindex::IocSearchIndexWorker;
use dbpg::postgres::Row as PgRow;
use hashbrown::HashMap;
use log::*;
use netpod::Database;
use stats::IocFinderStats;
@@ -23,9 +22,7 @@ use tokio::task::JoinHandle;
const SEARCH_DB_WORKER_CNT: usize = 2;
macro_rules! debug_batch { ($($arg:tt)*) => ( if false { debug!($($arg)*); } ) }
macro_rules! trace_batch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! debug_batch { ($($arg:expr),*) => ( if false { debug!($($arg),*); } ); }
autoerr::create_error_v1!(
name(Error, "Finder"),
+4 -3
View File
@@ -3,12 +3,13 @@ use crate::conf::ChannelConfig;
use crate::daemon_common::ChannelName;
use dashmap::DashMap;
use serde::Serialize;
use serde_helper::serde_instant::serde_Instant_elapsed_ms;
use series::ChannelStatusSeriesId;
use serieswriter::fixgridwriter::ChannelStatusSeriesWriter;
use serieswriter::fixgridwriter::ChannelStatusWriteState;
use std::collections::btree_map::RangeMut;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::btree_map::RangeMut;
use std::net::SocketAddr;
use std::net::SocketAddrV4;
use std::ops::RangeBounds;
@@ -65,7 +66,7 @@ pub enum WithAddressState {
pub struct UnassignedState {
#[serde(with = "humantime_serde")]
since: SystemTime,
#[serde(with = "serde_helper::serde_Instant")]
#[serde(with = "serde_Instant_elapsed_ms")]
unused_since_ts: Instant,
}
@@ -73,7 +74,7 @@ pub struct UnassignedState {
pub struct UnassigningForConfigChangeState {
pub config_new: ChannelConfig,
pub addr: SocketAddr,
#[serde(with = "serde_helper::serde_Instant")]
#[serde(with = "serde_Instant_elapsed_ms")]
pub since: Instant,
}
+4 -10
View File
@@ -102,11 +102,11 @@ impl CaIngestOpts {
}
pub fn insert_worker_count(&self) -> usize {
self.insert_worker_count.unwrap_or(8)
self.insert_worker_count.unwrap_or(10)
}
pub fn insert_worker_concurrency(&self) -> usize {
self.insert_worker_concurrency.unwrap_or(32)
self.insert_worker_concurrency.unwrap_or(64)
}
pub fn array_truncate(&self) -> u64 {
@@ -352,10 +352,6 @@ impl IngestConfigArchiving {
}
}
fn bool_is_false(x: &bool) -> bool {
*x == false
}
fn bool_true() -> bool {
true
}
@@ -363,12 +359,9 @@ fn bool_true() -> bool {
mod serde_ingest_config_archiving {
use super::ChannelReadConfigApiFormat;
use super::IngestConfigArchiving;
use serde::Deserializer;
use serde::Serializer;
use serde::de;
use serde::ser;
use serde::ser::SerializeMap;
use std::fmt;
impl ser::Serialize for IngestConfigArchiving {
fn serialize<S>(&self, ser: S) -> Result<S::Ok, S::Error>
@@ -682,7 +675,8 @@ impl ChannelConfig {
}
pub fn replication(&self) -> bool {
self.arch.replication
// self.arch.replication
true
}
pub fn poll_conf(&self) -> Option<(u64,)> {
+2
View File
@@ -23,6 +23,7 @@ pub enum DaemonEvent {
TimerTick(u32, Sender<u32>),
ChannelAdd(ChannelConfig, crate::ca::conn::CmdResTx),
ChannelRemove(ChannelName),
ChannelCommand(crate::ca::connset::ChannelCommand),
CaConnSetItem(CaConnSetItem),
Shutdown,
ConfigReload(async_channel::Sender<u64>),
@@ -35,6 +36,7 @@ impl DaemonEvent {
TimerTick(_, _) => format!("TimerTick"),
ChannelAdd(x, _) => format!("ChannelAdd {x:?}"),
ChannelRemove(x) => format!("ChannelRemove {x:?}"),
ChannelCommand(x) => format!("ChannelCommand {x:?}"),
CaConnSetItem(_) => format!("CaConnSetItem"),
Shutdown => format!("Shutdown"),
ConfigReload(..) => format!("ConfigReload"),
+5 -4
View File
@@ -2,6 +2,7 @@
pub mod delete;
pub mod ingest;
pub mod status;
pub mod types;
use crate::ca::conn::ChannelStateInfo;
use crate::ca::connset::CaConnSetEvent;
@@ -46,9 +47,9 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
@@ -367,9 +368,9 @@ fn make_routes(
connset_cmd_tx: Sender<CaConnSetEvent>,
stats_set: StatsSet,
) -> axum::Router {
use axum::Router;
use axum::extract;
use axum::routing::{get, post, put};
use axum::Router;
use http::StatusCode;
Router::new()
@@ -518,9 +519,9 @@ fn make_routes_channel(
connset_cmd_tx: Sender<CaConnSetEvent>,
stats_set: StatsSet,
) -> axum::Router {
use axum::Router;
use axum::extract;
use axum::routing::{get, post, put};
use axum::Router;
use http::StatusCode;
Router::new()
.fallback(|| async { axum::Json(json!({"subcommands":["states"]})) })
@@ -560,9 +561,9 @@ fn make_routes_ingest(
connset_cmd_tx: Sender<CaConnSetEvent>,
stats_set: StatsSet,
) -> axum::Router {
use axum::Router;
use axum::extract;
use axum::routing::{get, post, put};
use axum::Router;
use http::StatusCode;
Router::new()
.nest(
+6 -4
View File
@@ -1,10 +1,10 @@
pub mod write_v02;
use super::RoutesResources;
use axum::Json;
use axum::extract::FromRequest;
use axum::extract::Query;
use axum::http::HeaderMap;
use axum::Json;
use bytes::Bytes;
use core::fmt;
use dbpg::seriesbychannel::ChannelInfoQuery;
@@ -12,20 +12,21 @@ use futures_util::StreamExt;
use futures_util::TryStreamExt;
use items_2::binning::container_events::ContainerEvents;
use items_2::binning::container_events::EventValueType;
use netpod::log::*;
use netpod::ttl::RetentionTime;
use netpod::APP_CBOR_FRAMED;
use netpod::EnumVariant;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
use netpod::TsNano;
use netpod::APP_CBOR_FRAMED;
use netpod::log::*;
use netpod::ttl::RetentionTime;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::ArrayValue;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::ScalarValue;
use serde::Deserialize;
use serde::Serialize;
use series::SeriesId;
use serieswriter::msptool::MspSplit;
use serieswriter::writer::EmittableType;
@@ -70,6 +71,7 @@ macro_rules! trace_queues {
type ValueSeriesWriter = SeriesWriter<WritableType>;
#[derive(Debug, Serialize)]
struct WritableTypeState {
series: SeriesId,
msp_split_data: MspSplit,
+6 -4
View File
@@ -1,8 +1,8 @@
use crate::metrics::RoutesResources;
use axum::Json;
use axum::extract::FromRequest;
use axum::extract::Query;
use axum::http::HeaderMap;
use axum::Json;
use bytes::Bytes;
use core::fmt;
use dbpg::seriesbychannel::ChannelInfoQuery;
@@ -10,21 +10,22 @@ use futures_util::StreamExt;
use futures_util::TryStreamExt;
use items_2::binning::container_events::ContainerEvents;
use items_2::binning::container_events::EventValueType;
use netpod::log;
use netpod::ttl::RetentionTime;
use netpod::APP_CBOR_FRAMED;
use netpod::DaqbufChannelConfig;
use netpod::EnumVariant;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
use netpod::TsNano;
use netpod::APP_CBOR_FRAMED;
use netpod::log;
use netpod::ttl::RetentionTime;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::ArrayValue;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::ScalarValue;
use serde::Deserialize;
use serde::Serialize;
use series::SeriesId;
use serieswriter::msptool::MspSplit;
use serieswriter::writer::EmittableType;
@@ -76,6 +77,7 @@ autoerr::create_error_v1!(
type ValueSeriesWriter = SeriesWriter<WritableType>;
#[derive(Debug, Serialize)]
struct WritableTypeState {
series: SeriesId,
msp_split_data: MspSplit,
+27
View File
@@ -0,0 +1,27 @@
use scywr::insertqueues::InsertQueuesTx;
use serde::Serialize;
#[derive(Debug, Serialize)]
pub struct CaConnMetrics {
pub ca_conn_event_out_queue_len: usize,
}
pub struct InsertQueuesTxMetrics {
pub st_rf1_len: usize,
pub st_rf3_len: usize,
pub mt_rf3_len: usize,
pub lt_rf3_len: usize,
pub lt_rf3_lat5_len: usize,
}
impl From<&InsertQueuesTx> for InsertQueuesTxMetrics {
fn from(value: &InsertQueuesTx) -> Self {
Self {
st_rf1_len: value.st_rf1_tx.len(),
st_rf3_len: value.st_rf3_tx.len(),
mt_rf3_len: value.mt_rf3_tx.len(),
lt_rf3_len: value.lt_rf3_tx.len(),
lt_rf3_lat5_len: value.lt_rf3_lat5_tx.len(),
}
}
}
+16 -10
View File
@@ -96,6 +96,14 @@ impl InsertQueuesTx {
Ok(())
}
pub fn close_all(&self) {
self.st_rf1_tx.close();
self.st_rf3_tx.close();
self.mt_rf3_tx.close();
self.lt_rf3_tx.close();
self.lt_rf3_lat5_tx.close();
}
pub fn clone2(&self) -> Self {
self.clone()
}
@@ -231,11 +239,12 @@ impl<'a> fmt::Display for InsertDequesSummary<'a> {
let obj = self.obj;
write!(
fmt,
"InsertDeques {{ st_rf1_len: {}, st_rf3_len: {}, mt_rf3_len: {}, lt_rf3_len: {} }}",
"InsertDeques {{ st_rf1_len: {}, st_rf3_len: {}, mt_rf3_len: {}, lt_rf3_len: {}, lt_rf3_lat5_len: {} }}",
obj.st_rf1_qu.len(),
obj.st_rf3_qu.len(),
obj.mt_rf3_qu.len(),
obj.lt_rf3_qu.len()
obj.lt_rf3_qu.len(),
obj.lt_rf3_lat5_qu.len()
)
}
}
@@ -266,7 +275,11 @@ impl InsertSenderPolling {
}
pub fn is_idle(&self) -> bool {
self.st_rf1_sp.is_idle() && self.st_rf3_sp.is_idle() && self.mt_rf3_sp.is_idle() && self.lt_rf3_sp.is_idle()
self.st_rf1_sp.is_idle()
&& self.st_rf3_sp.is_idle()
&& self.mt_rf3_sp.is_idle()
&& self.lt_rf3_sp.is_idle()
&& self.lt_rf3_lat5_sp.is_idle()
}
pub fn st_rf1_sp_pin(self: Pin<&mut Self>) -> Pin<&mut SenderPolling<VecDeque<QueryItem>>> {
@@ -289,13 +302,6 @@ impl InsertSenderPolling {
self.project().lt_rf3_lat5_sp
}
pub fn __st_rf1_sp_pin(self: Pin<&mut Self>) -> Pin<&mut SenderPolling<VecDeque<QueryItem>>> {
if true {
panic!("encapsulated by pin_project");
}
unsafe { self.map_unchecked_mut(|x| &mut x.st_rf1_sp) }
}
pub fn summary(&self) -> InsertSenderPollingSummary {
InsertSenderPollingSummary { obj: self }
}
+2 -14
View File
@@ -1,14 +1,2 @@
#[allow(non_snake_case)]
pub mod serde_Instant {
use serde::Serializer;
use std::time::Instant;
#[allow(unused)]
pub fn serialize<S>(val: &Instant, ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let dur = val.elapsed();
ser.serialize_u64(dur.as_secs() * 1000 + dur.subsec_millis() as u64)
}
}
pub mod serde_dummy;
pub mod serde_instant;
+13
View File
@@ -0,0 +1,13 @@
#[allow(non_snake_case)]
pub mod serde_dummy {
use serde::Serializer;
use std::time::Instant;
#[allow(unused)]
pub fn serialize<S, T>(val: &T, ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
ser.serialize_str("DUMMY")
}
}
+14
View File
@@ -0,0 +1,14 @@
#[allow(non_snake_case)]
pub mod serde_Instant_elapsed_ms {
use serde::Serializer;
use std::time::Instant;
#[allow(unused)]
pub fn serialize<S>(val: &Instant, ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let dur = val.elapsed();
ser.serialize_u64(dur.as_secs() * 1000 + dur.subsec_millis() as u64)
}
}
+9 -8
View File
@@ -1,14 +1,11 @@
use crate::log;
use crate::rtwriter::MinQuiets;
use items_0::timebin::BinnedBinsTimeweightTrait;
use items_0::timebin::BinnedEventsTimeweightTrait;
use items_0::timebin::BinsBoxed;
use items_2::binning::container_bins::ContainerBins;
use items_2::binning::container_events::ContainerEvents;
use items_2::binning::timeweight::timeweight_bins::BinnedBinsTimeweight;
use items_2::binning::timeweight::timeweight_bins_lazy::BinnedBinsTimeweightLazy;
use items_2::binning::timeweight::timeweight_events::BinnedEventsTimeweight;
use items_2::binning::timeweight::timeweight_events_dyn::BinnedEventsTimeweightLazy;
use netpod::BinnedRange;
use netpod::DtMs;
use netpod::ScalarType;
@@ -19,10 +16,10 @@ use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::BinWriteIndexV03;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::TimeBinSimpleF32V02;
use serde::Serialize;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use series::msp::PrebinnedPartitioning;
use std::time::Duration;
macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ) }
@@ -31,7 +28,7 @@ macro_rules! debug_bin { ($t:expr, $($arg:expr),*) => ( if true { if $t { log::d
macro_rules! trace_ingest { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! trace_tick { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) }
macro_rules! trace_tick_verbose { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) }
macro_rules! trace_tick_verbose { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! trace_bin { ($t:expr, $($arg:expr),*) => ( if true { if $t { log::trace!($($arg),*); } } ) }
@@ -68,7 +65,7 @@ fn bin_len_clamp(dur: DtMs) -> PrebinnedPartitioning {
}
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
enum WriteCntZero {
Enable,
Disable,
@@ -83,7 +80,7 @@ impl WriteCntZero {
}
}
#[derive(Debug)]
#[derive(Debug, Serialize)]
enum IndexWritten {
None,
Last(u32, u32),
@@ -107,7 +104,7 @@ impl IndexWritten {
}
}
#[derive(Debug)]
#[derive(Debug, Serialize)]
pub struct BinWriter {
chname: String,
cssid: ChannelStatusSeriesId,
@@ -425,6 +422,10 @@ impl BinWriter {
iqdqs: &mut InsertDeques,
) -> Result<(), Error> {
let selfname = "handle_output_ready";
if true {
trace_tick!("{selfname} bins ready len {} DISCARDING", bins.len());
return Ok(());
}
trace_tick!("{selfname} bins ready len {}", bins.len());
for e in bins.iter_debug() {
trace_tick_verbose!("{e:?}");
+2 -1
View File
@@ -8,6 +8,7 @@ use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::MspItem;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::ScalarValue;
use serde::Serialize;
use series::SeriesId;
use std::time::Instant;
@@ -78,7 +79,7 @@ impl EmittableType for ChannelStatusWriteValue {
}
}
#[derive(Debug)]
#[derive(Debug, Serialize)]
pub struct ChannelStatusWriteState {
series: SeriesId,
msp_split: MspSplitFixGrid,
+2 -1
View File
@@ -2,11 +2,12 @@ pub mod fixgrid;
use netpod::DtNano;
use netpod::TsNano;
use serde::Serialize;
const SEC: u64 = 1000_000_000;
const HOUR: u64 = SEC * 60 * 60 * 24;
#[derive(Debug)]
#[derive(Debug, Serialize)]
pub struct MspSplit {
last: Option<TsNano>,
count: u32,
+2 -1
View File
@@ -2,8 +2,9 @@ use netpod::DtMs;
use netpod::DtNano;
use netpod::TsMs;
use netpod::TsNano;
use serde::Serialize;
#[derive(Debug)]
#[derive(Debug, Serialize)]
pub struct MspSplitFixGrid {
grid_dt: DtMs,
last: Option<TsMs>,
+31 -12
View File
@@ -5,6 +5,7 @@ use netpod::DtNano;
use netpod::TsNano;
use netpod::log;
use scywr::iteminsertqueue::QueryItem;
use serde::Serialize;
use series::SeriesId;
use std::collections::VecDeque;
use std::marker::PhantomData;
@@ -13,7 +14,7 @@ use std::time::Instant;
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); }
macro_rules! trace { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ); }
macro_rules! trace_rt_decision { ($det:expr, $($arg:expr),*) => ( if $det { log::trace!($($arg),*); } ); }
macro_rules! trace_rt_decision { ($dtd:expr, $($arg:expr),*) => ( if $dtd { log::trace!($($arg),*); } ); }
autoerr::create_error_v1!(
name(Error, "RateLimitWriter"),
@@ -29,6 +30,7 @@ pub struct WriteRes {
pub status: u8,
}
#[derive(Serialize)]
pub struct RateLimitWriter<ET>
where
ET: EmittableType,
@@ -66,7 +68,7 @@ where
last_insert_val: None,
dbgname,
writer,
do_trace_detail: netpod::TRACE_SERIES_ID.contains(&series.id()),
do_trace_detail: series::dbg::dbg_series(series),
_t1: PhantomData,
};
if ret.do_trace_detail {
@@ -83,9 +85,7 @@ where
tsev: TsNano,
deque: &mut VecDeque<QueryItem>,
) -> Result<WriteRes, Error> {
// Decide whether we want to write.
// TODO catch already in CaConn the cases when the IOC-timestamp did not change.
let det = self.do_trace_detail;
let dtd = self.do_trace_detail;
let dbgname = &self.dbgname;
let sid = &self.series;
let min_quiet = 1000 * self.min_quiet.as_secs() + self.min_quiet.subsec_millis() as u64;
@@ -93,7 +93,7 @@ where
let ts = tsev;
if false {
trace_rt_decision!(
det,
dtd,
"{} {} min_quiet {:?} ts1 {:?} ts2 {:?} item {:?}",
dbgname,
sid,
@@ -105,23 +105,42 @@ where
}
let do_write = {
if !self.is_polled && ts.ms() < tsl.ms() + min_quiet {
trace_rt_decision!(det, "{dbgname} {sid} ignore, because not min quiet {ts:?} {tsl:?}");
trace_rt_decision!(
dtd,
"{} {} ignore, because not min quiet {} {}",
dbgname,
sid,
ts,
tsl
);
false
} else if self.is_polled && ts.ms() + 800 < tsl.ms() + min_quiet {
trace_rt_decision!(
det,
"{dbgname} {sid} ignore, because not is-polled min quiet {ts:?} {tsl:?}"
dtd,
"{} {} ignore, because not is-polled min quiet {} {}",
dbgname,
sid,
ts,
tsl
);
false
} else if ts < tsl.add_dt_nano(DtNano::from_ms(5)) {
trace_rt_decision!(det, "{dbgname} {sid} ignore, because store rate cap");
} else if ts < tsl.add_dt_nano(DtNano::from_ms(1)) {
trace_rt_decision!(
dtd,
"{} {} ignore, because store rate cap {} {}",
dbgname,
sid,
ts,
tsl
);
false
} else {
trace_rt_decision!(det, "{dbgname} {sid} accept");
trace_rt_decision!(dtd, "{} {} accept {} {}", dbgname, sid, ts, tsl);
true
}
};
if do_write {
self.last_insert_ts = ts;
let res = self.writer.write(item, &mut self.emit_state, ts_net, ts, deque)?;
let ret = WriteRes {
accept: true,
+21 -25
View File
@@ -6,11 +6,13 @@ use netpod::Shape;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::QueryItem;
use serde::Serialize;
use series::SeriesId;
use std::collections::VecDeque;
use std::time::Duration;
use std::time::Instant;
macro_rules! debug_init { ($det:expr, $($arg:expr),*) => ( if $det { log::info!($($arg),*); } ); }
macro_rules! trace_emit { ($det:expr, $($arg:expr),*) => ( if $det { log::trace!($($arg),*); } ); }
macro_rules! trace_rt_decision { ($det:expr, $($arg:expr),*) => ( if $det { log::trace!($($arg),*); } ); }
@@ -23,14 +25,14 @@ autoerr::create_error_v1!(
},
);
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
pub struct MinQuiets {
pub st: Duration,
pub mt: Duration,
pub lt: Duration,
}
#[derive(Debug)]
#[derive(Debug, Serialize)]
struct State<ET>
where
ET: EmittableType,
@@ -72,7 +74,7 @@ impl Default for WriteRtRes {
}
}
#[derive(Debug)]
#[derive(Debug, Serialize)]
pub struct RtWriter<ET>
where
ET: EmittableType,
@@ -103,8 +105,9 @@ where
do_st_rf1: bool,
emit_state_new: &dyn Fn() -> <ET as EmittableType>::State,
) -> Result<Self, Error> {
let dtd = series::dbg::dbg_series(series);
debug_init!(dtd, "new {:?} is_polled {}", min_quiets, is_polled);
let state_st = {
// let writer = SeriesWriter::establish_with_sid(sid, stnow)?;
let writer = RateLimitWriter::new(series, min_quiets.st, is_polled, emit_state_new(), "st".into())?;
State { writer }
};
@@ -124,7 +127,7 @@ where
state_mt,
state_lt,
min_quiets,
do_trace_detail: netpod::TRACE_SERIES_ID.contains(&series.id()),
do_trace_detail: dtd,
do_st_rf1,
last_insert_ts: TsNano::from_ns(0),
last_insert_val: None,
@@ -157,6 +160,9 @@ where
) -> Result<WriteRes, Error> {
let det = self.do_trace_detail;
trace_emit!(det, "write {:?}", item.ts());
// TODO
// Optimize for the common case that we only write into one of the stores.
// Make the decision first, based on ref, then clone only as required.
let res_lt;
let res_mt;
let res_st;
@@ -182,31 +188,21 @@ where
.as_ref()
.map(|k| item.has_change(k))
.unwrap_or(true)
== false
{
// TODO filter duplicate values already here
res_lt = Self::write_inner(&mut self.state_lt, item.clone(), ts_net, tsev, &mut iqdqs.lt_rf3_qu)?;
if !res_lt.accept {
res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, tsev, &mut iqdqs.mt_rf3_qu)?;
if !res_mt.accept {
if self.do_st_rf1 {
res_st =
Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf1_qu)?;
} else {
res_st =
Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf3_qu)?;
}
} else {
res_st = WriteRtRes::default();
}
} else {
res_mt = WriteRtRes::default();
res_st = WriteRtRes::default();
}
} else {
trace_rt_decision!(det, "{} ignore, because value did not change", self.series);
res_lt = WriteRtRes::default();
res_mt = WriteRtRes::default();
res_st = WriteRtRes::default();
} else {
res_lt = Self::write_inner(&mut self.state_lt, item.clone(), ts_net, tsev, &mut iqdqs.lt_rf3_qu)?;
res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, tsev, &mut iqdqs.mt_rf3_qu)?;
res_st = if self.do_st_rf1 {
// Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf1_qu)?
Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf3_qu)?
} else {
Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf3_qu)?
};
}
let ret = WriteRes {
st: res_st,
+8 -6
View File
@@ -1,13 +1,15 @@
use core::fmt;
use log::*;
use netpod::TsNano;
use scywr::iteminsertqueue::QueryItem;
use serde::Serialize;
use series::SeriesId;
pub use smallvec::SmallVec;
use std::collections::VecDeque;
use std::fmt;
use std::marker::PhantomData;
use std::time::Instant;
pub use smallvec::SmallVec;
macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if $det { trace!($($arg)*); } ) }
autoerr::create_error_v1!(
@@ -32,7 +34,7 @@ pub struct EmitRes {
}
pub trait EmittableType: fmt::Debug + Clone {
type State;
type State: fmt::Debug + Serialize;
fn ts(&self) -> TsNano;
fn has_change(&self, k: &Self) -> bool;
fn byte_size(&self) -> u32;
@@ -57,7 +59,7 @@ pub struct WriteRes {
pub status: u8,
}
#[derive(Debug)]
#[derive(Debug, Serialize)]
pub struct SeriesWriter<ET> {
series: SeriesId,
do_trace_detail: bool,
@@ -71,7 +73,7 @@ where
pub fn new(series: SeriesId) -> Result<Self, Error> {
let res = Self {
series,
do_trace_detail: netpod::TRACE_SERIES_ID.contains(&series.id()),
do_trace_detail: series::dbg::dbg_series(series),
_t1: PhantomData,
};
Ok(res)
@@ -92,7 +94,7 @@ where
let det = self.do_trace_detail;
let ts_main = item.ts();
let res = item.into_query_item(ts_net, tsev, state);
trace_emit!(det, "emit value for ts {:?} items len {}", ts_main, res.items.len());
trace_emit!(det, "emit value for ts {} items len {}", ts_main, res.items.len());
for item in res.items {
deque.push_back(item);
}
+5
View File
@@ -316,6 +316,11 @@ stats_proc::stats_struct!((
channel_with_address,
channel_no_address,
connset_health_lat_ema,
iqtx_len_st_rf1,
iqtx_len_st_rf3,
iqtx_len_mt_rf3,
iqtx_len_lt_rf3,
iqtx_len_lt_rf3_lat5,
),
),
agg(name(DaemonStatsAgg), parent(DaemonStats)),