Show status emit count in channel state

This commit is contained in:
Dominik Werder
2024-07-19 10:19:47 +02:00
parent d0de644317
commit 85db531133
10 changed files with 310 additions and 163 deletions

View File

@@ -17,7 +17,6 @@ use dbpg::seriesbychannel::ChannelInfoResult;
use enumfetch::ConnFuture;
use err::thiserror;
use err::ThisError;
use futures_util::pin_mut;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
@@ -26,7 +25,6 @@ use hashbrown::HashMap;
use log::*;
use netpod::timeunits::*;
use netpod::ttl::RetentionTime;
use netpod::DtNano;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
@@ -45,7 +43,7 @@ use scywr::insertqueues::InsertSenderPolling;
use scywr::iteminsertqueue as scywriiq;
use scywr::iteminsertqueue::Accounting;
use scywr::iteminsertqueue::AccountingRecv;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::MspItem;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::ShutdownReason;
use scywr::senderpolling::SenderPolling;
@@ -58,6 +56,7 @@ use serde::Serialize;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use serieswriter::binwriter::BinWriter;
use serieswriter::msptool::MspSplit;
use serieswriter::rtwriter::RtWriter;
use serieswriter::writer::EmittableType;
use stats::rand_xoshiro::rand_core::RngCore;
@@ -220,6 +219,7 @@ pub struct ChannelStateInfo {
pub write_st_last: SystemTime,
pub write_mt_last: SystemTime,
pub write_lt_last: SystemTime,
pub status_emit_count: u64,
}
mod ser_instant {
@@ -461,6 +461,7 @@ struct CreatedState {
log_more: bool,
name: String,
enum_str_table: Option<Vec<String>>,
status_emit_count: u64,
}
impl CreatedState {
@@ -503,6 +504,7 @@ impl CreatedState {
log_more: false,
name: String::new(),
enum_str_table: None,
status_emit_count: 0,
}
}
@@ -614,6 +616,10 @@ impl ChannelState {
_ => None,
};
let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10);
let status_emit_count = match self {
ChannelState::Writable(s) => s.channel.status_emit_count,
_ => 0,
};
ChannelStateInfo {
stnow,
cssid,
@@ -633,6 +639,7 @@ impl ChannelState {
write_st_last,
write_mt_last,
write_lt_last,
status_emit_count,
}
}
@@ -1219,13 +1226,7 @@ impl CaConn {
scalar_type,
shape,
ch.conf.min_quiets(),
stnow,
&|| CaWriterValueState {
series_data: chinfo.series.to_series(),
series_status: st.series_status,
last_accepted_ts: TsNano::from_ns(0),
last_accepted_val: None,
},
&|| CaWriterValueState::new(st.series_status, chinfo.series.to_series()),
)?;
self.handle_writer_establish_inner(cid, writer)?;
have_progress = true;
@@ -1958,20 +1959,21 @@ impl CaConn {
Self::check_ev_value_data(&value.data, &writer.scalar_type())?;
crst.muted_before = 0;
crst.insert_item_ivl_ema.tick(tsnow);
let ts_ioc = TsNano::from_ns(ts);
let ts_local = TsNano::from_ns(ts_local);
// let ts_ioc = TsNano::from_ns(ts);
// let ts_local = TsNano::from_ns(ts_local);
// binwriter.ingest(ts_ioc, ts_local, &val, iqdqs)?;
{
let ((dwst, dwmt, dwlt),) = writer.write(CaWriterValue::new(value, crst), tsnow, iqdqs)?;
if dwst {
let wres = writer.write(CaWriterValue::new(value, crst), tsnow, iqdqs)?;
crst.status_emit_count += wres.nstatus() as u64;
if wres.st.accept {
crst.dw_st_last = stnow;
crst.acc_st.push_written(payload_len);
}
if dwmt {
if wres.mt.accept {
crst.dw_mt_last = stnow;
crst.acc_mt.push_written(payload_len);
}
if dwlt {
if wres.lt.accept {
crst.dw_lt_last = stnow;
crst.acc_lt.push_written(payload_len);
}
@@ -2526,6 +2528,7 @@ impl CaConn {
log_more,
name: conf.conf.name().into(),
enum_str_table: None,
status_emit_count: 0,
};
if dbg_chn_name(created_state.name()) {
info!(
@@ -3286,6 +3289,21 @@ struct CaWriterValueState {
series_status: SeriesId,
last_accepted_ts: TsNano,
last_accepted_val: Option<CaWriterValue>,
msp_split_status: MspSplit,
msp_split_data: MspSplit,
}
impl CaWriterValueState {
fn new(series_status: SeriesId, series_data: SeriesId) -> Self {
Self {
series_data,
series_status,
last_accepted_ts: TsNano::from_ns(0),
last_accepted_val: None,
msp_split_status: MspSplit::new(1024 * 64, 1024 * 1024 * 10),
msp_split_data: MspSplit::new(1024 * 64, 1024 * 1024 * 10),
}
}
}
#[derive(Debug, Clone)]
@@ -3342,13 +3360,10 @@ impl EmittableType for CaWriterValue {
fn into_query_item(
mut self,
ts_msp: TsMs,
ts_msp_changed: bool,
ts_lsp: DtNano,
ts_net: Instant,
state: &mut <Self as EmittableType>::State,
) -> serieswriter::writer::SmallVec<[QueryItem; 4]> {
let mut ret = serieswriter::writer::SmallVec::new();
) -> serieswriter::writer::EmitRes {
let mut items = serieswriter::writer::SmallVec::new();
let diff_data = match state.last_accepted_val.as_ref() {
Some(last) => self.0.data != last.0.data,
None => true,
@@ -3363,12 +3378,15 @@ impl EmittableType for CaWriterValue {
},
None => true,
};
let ts = TsNano::from_ns(self.0.ts().unwrap());
if let Some(ts) = self.0.ts() {
state.last_accepted_ts = TsNano::from_ns(ts);
}
state.last_accepted_val = Some(self.clone());
let byte_size = self.byte_size();
if diff_data {
debug!("diff_data emit {:?}", state.series_data);
let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_data.split(ts, self.byte_size());
let data_value = {
use super::proto::CaDataValue;
use scywr::iteminsertqueue::DataValue;
@@ -3408,36 +3426,59 @@ impl EmittableType for CaWriterValue {
};
ret
};
if ts_msp_chg {
items.push(QueryItem::Msp(MspItem::new(
state.series_data.clone(),
ts_msp.to_ts_ms(),
ts_net,
)));
}
let item = scywriiq::InsertItem {
series: state.series_data.clone(),
ts_msp,
ts_msp: ts_msp.to_ts_ms(),
ts_lsp,
ts_net,
msp_bump: ts_msp_changed,
val: data_value,
};
ret.push(QueryItem::Insert(item));
items.push(QueryItem::Insert(item));
}
let mut n_status = 0;
if diff_status {
debug!("diff_status emit {:?}", state.series_status);
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::ScalarValue;
match self.0.meta {
proto::CaMetaValue::CaMetaTime(meta) => {
let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_status.split(ts, 2);
if ts_msp_chg {
items.push(QueryItem::Msp(MspItem::new(
state.series_status.clone(),
ts_msp.to_ts_ms(),
ts_net,
)));
}
let data_value = DataValue::Scalar(ScalarValue::CaStatus(meta.status as i16));
let item = scywriiq::InsertItem {
series: state.series_status.clone(),
ts_msp,
ts_msp: ts_msp.to_ts_ms(),
ts_lsp,
ts_net,
msp_bump: ts_msp_changed,
val: data_value,
};
ret.push(QueryItem::Insert(item));
items.push(QueryItem::Insert(item));
n_status += 1;
// info!("diff_status emit {:?}", state.series_status);
}
_ => {
// TODO must be able to return error here
warn!("diff_status logic error");
}
_ => {}
};
}
let ret = serieswriter::writer::EmitRes {
items,
bytes: byte_size,
status: n_status,
};
ret
}
}

View File

@@ -89,12 +89,9 @@ impl EmittableType for WritableType {
fn into_query_item(
self,
ts_msp: netpod::TsMs,
ts_msp_changed: bool,
ts_lsp: netpod::DtNano,
ts_net: Instant,
state: &mut <Self as EmittableType>::State,
) -> serieswriter::writer::SmallVec<[QueryItem; 4]> {
) -> serieswriter::writer::EmitRes {
todo!()
}
}

View File

@@ -66,6 +66,18 @@ struct ChannelState {
write_lt_last: SystemTime,
#[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")]
updated: SystemTime,
private: StatePrivate,
}
#[derive(Debug, Serialize)]
struct StatePrivate {
status_emit_count: u64,
}
impl Default for StatePrivate {
fn default() -> Self {
Self { status_emit_count: 0 }
}
}
fn system_time_epoch(x: &SystemTime) -> bool {
@@ -114,6 +126,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
private: StatePrivate::default(),
};
states.channels.insert(k, chst);
}
@@ -129,6 +142,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
private: StatePrivate::default(),
};
states.channels.insert(k, chst);
}
@@ -147,6 +161,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
private: StatePrivate::default(),
};
states.channels.insert(k, chst);
}
@@ -165,6 +180,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
private: StatePrivate::default(),
};
states.channels.insert(k, chst);
}
@@ -183,12 +199,16 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
private: StatePrivate::default(),
};
states.channels.insert(k, chst);
}
ConnectionStateValue::ChannelStateInfo(st6) => {
let recv_count = st6.recv_count.unwrap_or(0);
let recv_bytes = st6.recv_bytes.unwrap_or(0);
let private = StatePrivate {
status_emit_count: st6.status_emit_count,
};
use crate::ca::conn::ChannelConnectedInfo;
match st6.channel_connected_info {
ChannelConnectedInfo::Disconnected => {
@@ -205,6 +225,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
write_mt_last: st6.write_mt_last,
write_lt_last: st6.write_lt_last,
updated: st6.stnow,
private,
};
states.channels.insert(k, chst);
}
@@ -220,6 +241,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
write_mt_last: st6.write_mt_last,
write_lt_last: st6.write_lt_last,
updated: st6.stnow,
private,
};
states.channels.insert(k, chst);
}
@@ -235,6 +257,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
write_mt_last: st6.write_mt_last,
write_lt_last: st6.write_lt_last,
updated: st6.stnow,
private,
};
states.channels.insert(k, chst);
}
@@ -250,6 +273,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
write_mt_last: st6.write_mt_last,
write_lt_last: st6.write_lt_last,
updated: st6.stnow,
private,
};
states.channels.insert(k, chst);
}
@@ -271,6 +295,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
private: StatePrivate::default(),
};
states.channels.insert(k, chst);
}
@@ -286,6 +311,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
private: StatePrivate::default(),
};
states.channels.insert(k, chst);
}
@@ -301,6 +327,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
private: StatePrivate::default(),
};
states.channels.insert(k, chst);
}

View File

@@ -7,6 +7,7 @@ use crate::iteminsertqueue::Accounting;
use crate::iteminsertqueue::AccountingRecv;
use crate::iteminsertqueue::InsertFut;
use crate::iteminsertqueue::InsertItem;
use crate::iteminsertqueue::MspItem;
use crate::iteminsertqueue::QueryItem;
use crate::iteminsertqueue::TimeBinSimpleF32;
use crate::store::DataStore;
@@ -17,8 +18,6 @@ use futures_util::Stream;
use futures_util::StreamExt;
use log::*;
use netpod::ttl::RetentionTime;
use netpod::TsMs;
use netpod::TsNano;
use smallvec::smallvec;
use smallvec::SmallVec;
use stats::InsertWorkerStats;
@@ -27,7 +26,6 @@ use std::sync::atomic;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use taskrun::tokio;
use tokio::task::JoinHandle;
@@ -276,6 +274,7 @@ where
for item in batch {
let futs = match item {
QueryItem::Insert(item) => prepare_query_insert_futs(item, &data_store, &stats, tsnow),
QueryItem::Msp(item) => prepare_msp_insert_futs(item, &data_store, &stats, tsnow),
QueryItem::ConnectionStatus(item) => {
stats.inserted_connection_status().inc();
let fut = insert_connection_status_fut(item, &data_store, stats.clone());
@@ -314,6 +313,9 @@ fn inspect_items(
QueryItem::ChannelStatus(_) => {
trace_item_execute!("execute {worker_name} ChannelStatus {item:?}");
}
QueryItem::Msp(item) => {
trace_item_execute!("execute {worker_name} Msp {}", item.string_short());
}
QueryItem::Insert(item) => {
trace_item_execute!("execute {worker_name} Insert {}", item.string_short());
}
@@ -331,6 +333,31 @@ fn inspect_items(
})
}
fn prepare_msp_insert_futs(
item: MspItem,
data_store: &Arc<DataStore>,
stats: &Arc<InsertWorkerStats>,
tsnow: Instant,
) -> SmallVec<[InsertFut; 4]> {
trace2!("execute MSP bump");
stats.inserts_msp().inc();
{
let dt = tsnow.saturating_duration_since(item.ts_net());
let dt_ms = 1000 * dt.as_secs() as u32 + dt.subsec_millis();
stats.item_lat_net_worker().ingest(dt_ms);
}
let fut = insert_msp_fut(
item.series(),
item.ts_msp(),
item.ts_net(),
data_store.scy.clone(),
data_store.qu_insert_ts_msp.clone(),
stats.clone(),
);
let futs = smallvec![fut];
futs
}
fn prepare_query_insert_futs(
item: InsertItem,
data_store: &Arc<DataStore>,
@@ -342,26 +369,10 @@ fn prepare_query_insert_futs(
let dt = tsnow.saturating_duration_since(item_ts_net);
let dt_ms = 1000 * dt.as_secs() as u32 + dt.subsec_millis();
stats.item_lat_net_worker().ingest(dt_ms);
let msp_bump = item.msp_bump;
let series = item.series.clone();
let ts_msp = item.ts_msp;
let do_insert = true;
let mut futs = smallvec![];
let fut = insert_item_fut(item, &data_store, do_insert, stats);
futs.push(fut);
if msp_bump {
trace2!("execute MSP bump");
stats.inserts_msp().inc();
let fut = insert_msp_fut(
series,
ts_msp,
item_ts_net,
data_store.scy.clone(),
data_store.qu_insert_ts_msp.clone(),
stats.clone(),
);
futs.push(fut);
}
futs
}

View File

@@ -551,12 +551,40 @@ impl ChannelStatusItem {
}
}
#[derive(Debug, Clone)]
pub struct MspItem {
series: SeriesId,
ts_msp: TsMs,
ts_net: Instant,
}
impl MspItem {
pub fn new(series: SeriesId, ts_msp: TsMs, ts_net: Instant) -> Self {
Self { series, ts_msp, ts_net }
}
pub fn string_short(&self) -> String {
format!("{} {}", self.series.id(), self.ts_msp.ms())
}
pub fn series(&self) -> SeriesId {
self.series.clone()
}
pub fn ts_msp(&self) -> TsMs {
self.ts_msp.clone()
}
pub fn ts_net(&self) -> Instant {
self.ts_net.clone()
}
}
#[derive(Debug, Clone)]
pub struct InsertItem {
pub series: SeriesId,
pub ts_msp: TsMs,
pub ts_lsp: DtNano,
pub msp_bump: bool,
pub val: DataValue,
pub ts_net: Instant,
}
@@ -590,6 +618,7 @@ pub struct TimeBinSimpleF32 {
pub enum QueryItem {
ConnectionStatus(ConnectionStatusItem),
ChannelStatus(ChannelStatusItem),
Msp(MspItem),
Insert(InsertItem),
TimeBinSimpleF32(TimeBinSimpleF32),
Accounting(Accounting),

View File

@@ -1,5 +1,6 @@
pub mod binwriter;
pub mod changewriter;
pub mod msptool;
pub mod patchcollect;
pub mod ratelimitwriter;
pub mod rtwriter;

View File

@@ -0,0 +1,60 @@
use netpod::DtNano;
use netpod::TsNano;
const SEC: u64 = 1000_000_000;
const HOUR: u64 = SEC * 60 * 60 * 24;
#[derive(Debug)]
pub struct MspSplit {
last: Option<TsNano>,
count: u32,
bytes: u32,
count_max: u32,
bytes_max: u32,
}
impl MspSplit {
pub fn new(count_max: u32, bytes_max: u32) -> Self {
Self {
last: None,
count: 0,
bytes: 0,
count_max,
bytes_max,
}
}
pub fn split(&mut self, ts: TsNano, item_bytes: u32) -> (TsNano, DtNano, bool) {
// Maximum resolution of the ts msp:
let msp_res_max = SEC * 2;
let ts_main = ts;
let (ts_msp, changed) = match self.last {
Some(ts_msp_last) => {
if self.count >= self.count_max || self.bytes >= self.bytes_max || ts_msp_last.add_ns(HOUR) <= ts_main {
let ts_msp = ts_main.div(msp_res_max).mul(msp_res_max);
if ts_msp == ts_msp_last {
(ts_msp, false)
} else {
self.last = Some(ts_msp);
self.count = 1;
self.bytes = item_bytes;
(ts_msp, true)
}
} else {
self.count += 1;
self.bytes += item_bytes;
(ts_msp_last, false)
}
}
None => {
let ts_msp = ts_main.div(msp_res_max).mul(msp_res_max);
self.last = Some(ts_msp);
self.count = 1;
self.bytes = item_bytes;
(ts_msp, true)
}
};
let ts_lsp = ts_main.delta(ts_msp);
(ts_msp, ts_lsp, changed)
}
}

View File

@@ -28,6 +28,13 @@ pub enum Error {
SeriesWriter(#[from] crate::writer::Error),
}
#[derive(Debug)]
pub struct WriteRes {
pub accept: bool,
pub bytes: u32,
pub status: u8,
}
pub struct RateLimitWriter<ET>
where
ET: EmittableType,
@@ -66,7 +73,7 @@ where
Ok(ret)
}
pub fn write(&mut self, item: ET, ts_net: Instant, deque: &mut VecDeque<QueryItem>) -> Result<(bool,), Error> {
pub fn write(&mut self, item: ET, ts_net: Instant, deque: &mut VecDeque<QueryItem>) -> Result<WriteRes, Error> {
// Decide whether we want to write.
// TODO catch already in CaConn the cases when the IOC-timestamp did not change.
let tsl = self.last_insert_ts.clone();
@@ -100,9 +107,21 @@ where
}
};
if do_write {
self.writer.write(item, &mut self.emit_state, ts_net, deque)?;
let res = self.writer.write(item, &mut self.emit_state, ts_net, deque)?;
let ret = WriteRes {
accept: true,
bytes: res.bytes,
status: res.status,
};
Ok(ret)
} else {
let ret = WriteRes {
accept: false,
bytes: 0,
status: 0,
};
Ok(ret)
}
Ok((do_write,))
}
pub fn tick(&mut self, iqdqs: &mut VecDeque<QueryItem>) -> Result<(), Error> {

View File

@@ -1,22 +1,16 @@
use crate::ratelimitwriter::RateLimitWriter;
use crate::writer::EmittableType;
use async_channel::Sender;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use netpod::log::*;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::QueryItem;
use series::SeriesId;
use std::collections::VecDeque;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
#[allow(unused)]
macro_rules! trace_ {
@@ -50,6 +44,26 @@ where
writer: RateLimitWriter<ET>,
}
#[derive(Debug)]
pub struct WriteRes {
pub st: WriteRtRes,
pub mt: WriteRtRes,
pub lt: WriteRtRes,
}
impl WriteRes {
pub fn nstatus(&self) -> u8 {
self.st.status + self.mt.status + self.lt.status
}
}
#[derive(Debug)]
pub struct WriteRtRes {
pub accept: bool,
pub bytes: u32,
pub status: u8,
}
#[derive(Debug)]
pub struct RtWriter<ET>
where
@@ -73,7 +87,6 @@ where
scalar_type: ScalarType,
shape: Shape,
min_quiets: MinQuiets,
stnow: SystemTime,
emit_state_new: &dyn Fn() -> <ET as EmittableType>::State,
) -> Result<Self, Error> {
let state_st = {
@@ -117,20 +130,32 @@ where
self.min_quiets.clone()
}
pub fn write(
&mut self,
item: ET,
ts_net: Instant,
iqdqs: &mut InsertDeques,
) -> Result<((bool, bool, bool),), Error> {
pub fn write(&mut self, item: ET, ts_net: Instant, iqdqs: &mut InsertDeques) -> Result<WriteRes, Error> {
trace!("write {:?}", item.ts());
// TODO
// Optimize for the common case that we only write into one of the stores.
// Make the decision first, based on ref, then clone only as required.
let (did_write_st,) = Self::write_inner(&mut self.state_st, item.clone(), ts_net, &mut iqdqs.st_rf3_rx)?;
let (did_write_mt,) = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, &mut iqdqs.mt_rf3_rx)?;
let (did_write_lt,) = Self::write_inner(&mut self.state_lt, item, ts_net, &mut iqdqs.lt_rf3_rx)?;
Ok(((did_write_st, did_write_mt, did_write_lt),))
let res_st = Self::write_inner(&mut self.state_st, item.clone(), ts_net, &mut iqdqs.st_rf3_rx)?;
let res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, &mut iqdqs.mt_rf3_rx)?;
let res_lt = Self::write_inner(&mut self.state_lt, item, ts_net, &mut iqdqs.lt_rf3_rx)?;
let ret = WriteRes {
st: WriteRtRes {
accept: res_st.accept,
bytes: res_st.bytes,
status: res_st.status,
},
mt: WriteRtRes {
accept: res_mt.accept,
bytes: res_mt.bytes,
status: res_mt.status,
},
lt: WriteRtRes {
accept: res_lt.accept,
bytes: res_lt.bytes,
status: res_lt.status,
},
};
Ok(ret)
}
fn write_inner(
@@ -138,7 +163,7 @@ where
item: ET,
ts_net: Instant,
deque: &mut VecDeque<QueryItem>,
) -> Result<(bool,), Error> {
) -> Result<crate::ratelimitwriter::WriteRes, Error> {
Ok(state.writer.write(item, ts_net, deque)?)
}
@@ -149,10 +174,3 @@ where
Ok(())
}
}
#[derive(Debug)]
struct LastIns {
ts_local: TsNano,
ts_ioc: TsNano,
val: DataValue,
}

View File

@@ -1,42 +1,29 @@
use async_channel::Sender;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use log::*;
use netpod::timeunits::HOUR;
use netpod::timeunits::SEC;
use netpod::DtNano;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
use netpod::TsMs;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::InsertItem;
use scywr::iteminsertqueue::QueryItem;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::time::Instant;
use std::time::SystemTime;
use core::fmt;
pub use smallvec::SmallVec;
pub trait EmittableType: ::core::fmt::Debug + Clone {
#[derive(Debug)]
pub struct EmitRes {
pub items: SmallVec<[QueryItem; 4]>,
pub bytes: u32,
pub status: u8,
}
pub trait EmittableType: fmt::Debug + Clone {
type State;
fn ts(&self) -> TsNano;
fn has_change(&self, k: &Self) -> bool;
fn byte_size(&self) -> u32;
fn into_query_item(
self,
ts_msp: TsMs,
ts_msp_changed: bool,
ts_lsp: DtNano,
ts_net: Instant,
state: &mut <Self as EmittableType>::State,
) -> SmallVec<[QueryItem; 4]>;
fn into_query_item(self, ts_net: Instant, state: &mut <Self as EmittableType>::State) -> EmitRes;
}
#[derive(Debug, ThisError)]
@@ -65,16 +52,15 @@ impl From<async_channel::RecvError> for Error {
}
}
#[derive(Debug)]
pub struct WriteRes {
pub bytes: u32,
pub status: u8,
}
#[derive(Debug)]
pub struct SeriesWriter<ET> {
sid: SeriesId,
ts_msp_last: Option<TsNano>,
inserted_in_current_msp: u32,
bytes_in_current_msp: u32,
msp_max_entries: u32,
msp_max_bytes: u32,
// TODO this should be in an Option:
ts_msp_grid_last: u32,
_t1: PhantomData<ET>,
}
@@ -83,16 +69,7 @@ where
ET: EmittableType,
{
pub fn new(sid: SeriesId) -> Result<Self, Error> {
let res = Self {
sid,
ts_msp_last: None,
inserted_in_current_msp: 0,
bytes_in_current_msp: 0,
msp_max_entries: 64000,
msp_max_bytes: 1024 * 1024 * 20,
ts_msp_grid_last: 0,
_t1: PhantomData,
};
let res = Self { sid, _t1: PhantomData };
Ok(res)
}
@@ -106,51 +83,18 @@ where
state: &mut <ET as EmittableType>::State,
ts_net: Instant,
deque: &mut VecDeque<QueryItem>,
) -> Result<(), Error> {
) -> Result<WriteRes, Error> {
let ts_main = item.ts();
// TODO decide on better msp/lsp: random offset!
// As long as one writer is active, the msp is arbitrary.
// Maximum resolution of the ts msp:
let msp_res_max = SEC * 2;
let (ts_msp, ts_msp_changed) = match self.ts_msp_last {
Some(ts_msp_last) => {
if self.inserted_in_current_msp >= self.msp_max_entries
|| self.bytes_in_current_msp >= self.msp_max_bytes
|| ts_msp_last.add_ns(HOUR) <= ts_main
{
let ts_msp = ts_main.div(msp_res_max).mul(msp_res_max);
if ts_msp == ts_msp_last {
(ts_msp, false)
} else {
self.ts_msp_last = Some(ts_msp);
self.inserted_in_current_msp = 1;
self.bytes_in_current_msp = item.byte_size();
(ts_msp, true)
}
} else {
self.inserted_in_current_msp += 1;
self.bytes_in_current_msp += item.byte_size();
(ts_msp_last, false)
}
}
None => {
let ts_msp = ts_main.div(msp_res_max).mul(msp_res_max);
self.ts_msp_last = Some(ts_msp);
self.inserted_in_current_msp = 1;
self.bytes_in_current_msp = item.byte_size();
(ts_msp, true)
}
};
let ts_lsp = ts_main.delta(ts_msp);
let items = item.into_query_item(ts_msp.to_ts_ms(), ts_msp_changed, ts_lsp, ts_net, state);
trace!("emit value for ts {:?} items len {}", ts_main, items.len());
for item in items {
let res = item.into_query_item(ts_net, state);
trace!("emit value for ts {:?} items len {}", ts_main, res.items.len());
for item in res.items {
deque.push_back(item);
}
Ok(())
let res = WriteRes {
bytes: res.bytes,
status: res.status,
};
Ok(res)
}
pub fn tick(&mut self, deque: &mut VecDeque<QueryItem>) -> Result<(), Error> {