diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 045d037..34f3d97 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -17,7 +17,6 @@ use dbpg::seriesbychannel::ChannelInfoResult; use enumfetch::ConnFuture; use err::thiserror; use err::ThisError; -use futures_util::pin_mut; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; @@ -26,7 +25,6 @@ use hashbrown::HashMap; use log::*; use netpod::timeunits::*; use netpod::ttl::RetentionTime; -use netpod::DtNano; use netpod::ScalarType; use netpod::SeriesKind; use netpod::Shape; @@ -45,7 +43,7 @@ use scywr::insertqueues::InsertSenderPolling; use scywr::iteminsertqueue as scywriiq; use scywr::iteminsertqueue::Accounting; use scywr::iteminsertqueue::AccountingRecv; -use scywr::iteminsertqueue::DataValue; +use scywr::iteminsertqueue::MspItem; use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::ShutdownReason; use scywr::senderpolling::SenderPolling; @@ -58,6 +56,7 @@ use serde::Serialize; use series::ChannelStatusSeriesId; use series::SeriesId; use serieswriter::binwriter::BinWriter; +use serieswriter::msptool::MspSplit; use serieswriter::rtwriter::RtWriter; use serieswriter::writer::EmittableType; use stats::rand_xoshiro::rand_core::RngCore; @@ -220,6 +219,7 @@ pub struct ChannelStateInfo { pub write_st_last: SystemTime, pub write_mt_last: SystemTime, pub write_lt_last: SystemTime, + pub status_emit_count: u64, } mod ser_instant { @@ -461,6 +461,7 @@ struct CreatedState { log_more: bool, name: String, enum_str_table: Option>, + status_emit_count: u64, } impl CreatedState { @@ -503,6 +504,7 @@ impl CreatedState { log_more: false, name: String::new(), enum_str_table: None, + status_emit_count: 0, } } @@ -614,6 +616,10 @@ impl ChannelState { _ => None, }; let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10); + let status_emit_count = match self { + ChannelState::Writable(s) => s.channel.status_emit_count, + _ => 0, + }; ChannelStateInfo { stnow, cssid, @@ -633,6 +639,7 @@ impl ChannelState { write_st_last, write_mt_last, write_lt_last, + status_emit_count, } } @@ -1219,13 +1226,7 @@ impl CaConn { scalar_type, shape, ch.conf.min_quiets(), - stnow, - &|| CaWriterValueState { - series_data: chinfo.series.to_series(), - series_status: st.series_status, - last_accepted_ts: TsNano::from_ns(0), - last_accepted_val: None, - }, + &|| CaWriterValueState::new(st.series_status, chinfo.series.to_series()), )?; self.handle_writer_establish_inner(cid, writer)?; have_progress = true; @@ -1958,20 +1959,21 @@ impl CaConn { Self::check_ev_value_data(&value.data, &writer.scalar_type())?; crst.muted_before = 0; crst.insert_item_ivl_ema.tick(tsnow); - let ts_ioc = TsNano::from_ns(ts); - let ts_local = TsNano::from_ns(ts_local); + // let ts_ioc = TsNano::from_ns(ts); + // let ts_local = TsNano::from_ns(ts_local); // binwriter.ingest(ts_ioc, ts_local, &val, iqdqs)?; { - let ((dwst, dwmt, dwlt),) = writer.write(CaWriterValue::new(value, crst), tsnow, iqdqs)?; - if dwst { + let wres = writer.write(CaWriterValue::new(value, crst), tsnow, iqdqs)?; + crst.status_emit_count += wres.nstatus() as u64; + if wres.st.accept { crst.dw_st_last = stnow; crst.acc_st.push_written(payload_len); } - if dwmt { + if wres.mt.accept { crst.dw_mt_last = stnow; crst.acc_mt.push_written(payload_len); } - if dwlt { + if wres.lt.accept { crst.dw_lt_last = stnow; crst.acc_lt.push_written(payload_len); } @@ -2526,6 +2528,7 @@ impl CaConn { log_more, name: conf.conf.name().into(), enum_str_table: None, + status_emit_count: 0, }; if dbg_chn_name(created_state.name()) { info!( @@ -3286,6 +3289,21 @@ struct CaWriterValueState { series_status: SeriesId, last_accepted_ts: TsNano, last_accepted_val: Option, + msp_split_status: MspSplit, + msp_split_data: MspSplit, +} + +impl CaWriterValueState { + fn new(series_status: SeriesId, series_data: SeriesId) -> Self { + Self { + series_data, + series_status, + last_accepted_ts: TsNano::from_ns(0), + last_accepted_val: None, + msp_split_status: MspSplit::new(1024 * 64, 1024 * 1024 * 10), + msp_split_data: MspSplit::new(1024 * 64, 1024 * 1024 * 10), + } + } } #[derive(Debug, Clone)] @@ -3342,13 +3360,10 @@ impl EmittableType for CaWriterValue { fn into_query_item( mut self, - ts_msp: TsMs, - ts_msp_changed: bool, - ts_lsp: DtNano, ts_net: Instant, state: &mut ::State, - ) -> serieswriter::writer::SmallVec<[QueryItem; 4]> { - let mut ret = serieswriter::writer::SmallVec::new(); + ) -> serieswriter::writer::EmitRes { + let mut items = serieswriter::writer::SmallVec::new(); let diff_data = match state.last_accepted_val.as_ref() { Some(last) => self.0.data != last.0.data, None => true, @@ -3363,12 +3378,15 @@ impl EmittableType for CaWriterValue { }, None => true, }; + let ts = TsNano::from_ns(self.0.ts().unwrap()); if let Some(ts) = self.0.ts() { state.last_accepted_ts = TsNano::from_ns(ts); } state.last_accepted_val = Some(self.clone()); + let byte_size = self.byte_size(); if diff_data { debug!("diff_data emit {:?}", state.series_data); + let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_data.split(ts, self.byte_size()); let data_value = { use super::proto::CaDataValue; use scywr::iteminsertqueue::DataValue; @@ -3408,36 +3426,59 @@ impl EmittableType for CaWriterValue { }; ret }; + if ts_msp_chg { + items.push(QueryItem::Msp(MspItem::new( + state.series_data.clone(), + ts_msp.to_ts_ms(), + ts_net, + ))); + } let item = scywriiq::InsertItem { series: state.series_data.clone(), - ts_msp, + ts_msp: ts_msp.to_ts_ms(), ts_lsp, ts_net, - msp_bump: ts_msp_changed, val: data_value, }; - ret.push(QueryItem::Insert(item)); + items.push(QueryItem::Insert(item)); } + let mut n_status = 0; if diff_status { - debug!("diff_status emit {:?}", state.series_status); use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::ScalarValue; match self.0.meta { proto::CaMetaValue::CaMetaTime(meta) => { + let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_status.split(ts, 2); + if ts_msp_chg { + items.push(QueryItem::Msp(MspItem::new( + state.series_status.clone(), + ts_msp.to_ts_ms(), + ts_net, + ))); + } let data_value = DataValue::Scalar(ScalarValue::CaStatus(meta.status as i16)); let item = scywriiq::InsertItem { series: state.series_status.clone(), - ts_msp, + ts_msp: ts_msp.to_ts_ms(), ts_lsp, ts_net, - msp_bump: ts_msp_changed, val: data_value, }; - ret.push(QueryItem::Insert(item)); + items.push(QueryItem::Insert(item)); + n_status += 1; + // info!("diff_status emit {:?}", state.series_status); + } + _ => { + // TODO must be able to return error here + warn!("diff_status logic error"); } - _ => {} }; } + let ret = serieswriter::writer::EmitRes { + items, + bytes: byte_size, + status: n_status, + }; ret } } diff --git a/netfetch/src/metrics/ingest.rs b/netfetch/src/metrics/ingest.rs index e460279..e644310 100644 --- a/netfetch/src/metrics/ingest.rs +++ b/netfetch/src/metrics/ingest.rs @@ -89,12 +89,9 @@ impl EmittableType for WritableType { fn into_query_item( self, - ts_msp: netpod::TsMs, - ts_msp_changed: bool, - ts_lsp: netpod::DtNano, ts_net: Instant, state: &mut ::State, - ) -> serieswriter::writer::SmallVec<[QueryItem; 4]> { + ) -> serieswriter::writer::EmitRes { todo!() } } diff --git a/netfetch/src/metrics/status.rs b/netfetch/src/metrics/status.rs index 5fecd76..7aec7d8 100644 --- a/netfetch/src/metrics/status.rs +++ b/netfetch/src/metrics/status.rs @@ -66,6 +66,18 @@ struct ChannelState { write_lt_last: SystemTime, #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] updated: SystemTime, + private: StatePrivate, +} + +#[derive(Debug, Serialize)] +struct StatePrivate { + status_emit_count: u64, +} + +impl Default for StatePrivate { + fn default() -> Self { + Self { status_emit_count: 0 } + } } fn system_time_epoch(x: &SystemTime) -> bool { @@ -114,6 +126,7 @@ pub async fn channel_states(params: HashMap, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender { let recv_count = st6.recv_count.unwrap_or(0); let recv_bytes = st6.recv_bytes.unwrap_or(0); + let private = StatePrivate { + status_emit_count: st6.status_emit_count, + }; use crate::ca::conn::ChannelConnectedInfo; match st6.channel_connected_info { ChannelConnectedInfo::Disconnected => { @@ -205,6 +225,7 @@ pub async fn channel_states(params: HashMap, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender prepare_query_insert_futs(item, &data_store, &stats, tsnow), + QueryItem::Msp(item) => prepare_msp_insert_futs(item, &data_store, &stats, tsnow), QueryItem::ConnectionStatus(item) => { stats.inserted_connection_status().inc(); let fut = insert_connection_status_fut(item, &data_store, stats.clone()); @@ -314,6 +313,9 @@ fn inspect_items( QueryItem::ChannelStatus(_) => { trace_item_execute!("execute {worker_name} ChannelStatus {item:?}"); } + QueryItem::Msp(item) => { + trace_item_execute!("execute {worker_name} Msp {}", item.string_short()); + } QueryItem::Insert(item) => { trace_item_execute!("execute {worker_name} Insert {}", item.string_short()); } @@ -331,6 +333,31 @@ fn inspect_items( }) } +fn prepare_msp_insert_futs( + item: MspItem, + data_store: &Arc, + stats: &Arc, + tsnow: Instant, +) -> SmallVec<[InsertFut; 4]> { + trace2!("execute MSP bump"); + stats.inserts_msp().inc(); + { + let dt = tsnow.saturating_duration_since(item.ts_net()); + let dt_ms = 1000 * dt.as_secs() as u32 + dt.subsec_millis(); + stats.item_lat_net_worker().ingest(dt_ms); + } + let fut = insert_msp_fut( + item.series(), + item.ts_msp(), + item.ts_net(), + data_store.scy.clone(), + data_store.qu_insert_ts_msp.clone(), + stats.clone(), + ); + let futs = smallvec![fut]; + futs +} + fn prepare_query_insert_futs( item: InsertItem, data_store: &Arc, @@ -342,26 +369,10 @@ fn prepare_query_insert_futs( let dt = tsnow.saturating_duration_since(item_ts_net); let dt_ms = 1000 * dt.as_secs() as u32 + dt.subsec_millis(); stats.item_lat_net_worker().ingest(dt_ms); - let msp_bump = item.msp_bump; - let series = item.series.clone(); - let ts_msp = item.ts_msp; let do_insert = true; let mut futs = smallvec![]; let fut = insert_item_fut(item, &data_store, do_insert, stats); futs.push(fut); - if msp_bump { - trace2!("execute MSP bump"); - stats.inserts_msp().inc(); - let fut = insert_msp_fut( - series, - ts_msp, - item_ts_net, - data_store.scy.clone(), - data_store.qu_insert_ts_msp.clone(), - stats.clone(), - ); - futs.push(fut); - } futs } diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 88b2e39..327e757 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -551,12 +551,40 @@ impl ChannelStatusItem { } } +#[derive(Debug, Clone)] +pub struct MspItem { + series: SeriesId, + ts_msp: TsMs, + ts_net: Instant, +} + +impl MspItem { + pub fn new(series: SeriesId, ts_msp: TsMs, ts_net: Instant) -> Self { + Self { series, ts_msp, ts_net } + } + + pub fn string_short(&self) -> String { + format!("{} {}", self.series.id(), self.ts_msp.ms()) + } + + pub fn series(&self) -> SeriesId { + self.series.clone() + } + + pub fn ts_msp(&self) -> TsMs { + self.ts_msp.clone() + } + + pub fn ts_net(&self) -> Instant { + self.ts_net.clone() + } +} + #[derive(Debug, Clone)] pub struct InsertItem { pub series: SeriesId, pub ts_msp: TsMs, pub ts_lsp: DtNano, - pub msp_bump: bool, pub val: DataValue, pub ts_net: Instant, } @@ -590,6 +618,7 @@ pub struct TimeBinSimpleF32 { pub enum QueryItem { ConnectionStatus(ConnectionStatusItem), ChannelStatus(ChannelStatusItem), + Msp(MspItem), Insert(InsertItem), TimeBinSimpleF32(TimeBinSimpleF32), Accounting(Accounting), diff --git a/serieswriter/src/lib.rs b/serieswriter/src/lib.rs index e4d5855..f696bfa 100644 --- a/serieswriter/src/lib.rs +++ b/serieswriter/src/lib.rs @@ -1,5 +1,6 @@ pub mod binwriter; pub mod changewriter; +pub mod msptool; pub mod patchcollect; pub mod ratelimitwriter; pub mod rtwriter; diff --git a/serieswriter/src/msptool.rs b/serieswriter/src/msptool.rs new file mode 100644 index 0000000..e839694 --- /dev/null +++ b/serieswriter/src/msptool.rs @@ -0,0 +1,60 @@ +use netpod::DtNano; +use netpod::TsNano; + +const SEC: u64 = 1000_000_000; +const HOUR: u64 = SEC * 60 * 60 * 24; + +#[derive(Debug)] +pub struct MspSplit { + last: Option, + count: u32, + bytes: u32, + count_max: u32, + bytes_max: u32, +} + +impl MspSplit { + pub fn new(count_max: u32, bytes_max: u32) -> Self { + Self { + last: None, + count: 0, + bytes: 0, + count_max, + bytes_max, + } + } + + pub fn split(&mut self, ts: TsNano, item_bytes: u32) -> (TsNano, DtNano, bool) { + // Maximum resolution of the ts msp: + let msp_res_max = SEC * 2; + let ts_main = ts; + let (ts_msp, changed) = match self.last { + Some(ts_msp_last) => { + if self.count >= self.count_max || self.bytes >= self.bytes_max || ts_msp_last.add_ns(HOUR) <= ts_main { + let ts_msp = ts_main.div(msp_res_max).mul(msp_res_max); + if ts_msp == ts_msp_last { + (ts_msp, false) + } else { + self.last = Some(ts_msp); + self.count = 1; + self.bytes = item_bytes; + (ts_msp, true) + } + } else { + self.count += 1; + self.bytes += item_bytes; + (ts_msp_last, false) + } + } + None => { + let ts_msp = ts_main.div(msp_res_max).mul(msp_res_max); + self.last = Some(ts_msp); + self.count = 1; + self.bytes = item_bytes; + (ts_msp, true) + } + }; + let ts_lsp = ts_main.delta(ts_msp); + (ts_msp, ts_lsp, changed) + } +} diff --git a/serieswriter/src/ratelimitwriter.rs b/serieswriter/src/ratelimitwriter.rs index cb52372..7bb124c 100644 --- a/serieswriter/src/ratelimitwriter.rs +++ b/serieswriter/src/ratelimitwriter.rs @@ -28,6 +28,13 @@ pub enum Error { SeriesWriter(#[from] crate::writer::Error), } +#[derive(Debug)] +pub struct WriteRes { + pub accept: bool, + pub bytes: u32, + pub status: u8, +} + pub struct RateLimitWriter where ET: EmittableType, @@ -66,7 +73,7 @@ where Ok(ret) } - pub fn write(&mut self, item: ET, ts_net: Instant, deque: &mut VecDeque) -> Result<(bool,), Error> { + pub fn write(&mut self, item: ET, ts_net: Instant, deque: &mut VecDeque) -> Result { // Decide whether we want to write. // TODO catch already in CaConn the cases when the IOC-timestamp did not change. let tsl = self.last_insert_ts.clone(); @@ -100,9 +107,21 @@ where } }; if do_write { - self.writer.write(item, &mut self.emit_state, ts_net, deque)?; + let res = self.writer.write(item, &mut self.emit_state, ts_net, deque)?; + let ret = WriteRes { + accept: true, + bytes: res.bytes, + status: res.status, + }; + Ok(ret) + } else { + let ret = WriteRes { + accept: false, + bytes: 0, + status: 0, + }; + Ok(ret) } - Ok((do_write,)) } pub fn tick(&mut self, iqdqs: &mut VecDeque) -> Result<(), Error> { diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index 498bc88..35488c7 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -1,22 +1,16 @@ use crate::ratelimitwriter::RateLimitWriter; use crate::writer::EmittableType; -use async_channel::Sender; -use dbpg::seriesbychannel::ChannelInfoQuery; use err::thiserror; use err::ThisError; use netpod::log::*; use netpod::ScalarType; -use netpod::SeriesKind; use netpod::Shape; -use netpod::TsNano; use scywr::insertqueues::InsertDeques; -use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::QueryItem; use series::SeriesId; use std::collections::VecDeque; use std::time::Duration; use std::time::Instant; -use std::time::SystemTime; #[allow(unused)] macro_rules! trace_ { @@ -50,6 +44,26 @@ where writer: RateLimitWriter, } +#[derive(Debug)] +pub struct WriteRes { + pub st: WriteRtRes, + pub mt: WriteRtRes, + pub lt: WriteRtRes, +} + +impl WriteRes { + pub fn nstatus(&self) -> u8 { + self.st.status + self.mt.status + self.lt.status + } +} + +#[derive(Debug)] +pub struct WriteRtRes { + pub accept: bool, + pub bytes: u32, + pub status: u8, +} + #[derive(Debug)] pub struct RtWriter where @@ -73,7 +87,6 @@ where scalar_type: ScalarType, shape: Shape, min_quiets: MinQuiets, - stnow: SystemTime, emit_state_new: &dyn Fn() -> ::State, ) -> Result { let state_st = { @@ -117,20 +130,32 @@ where self.min_quiets.clone() } - pub fn write( - &mut self, - item: ET, - ts_net: Instant, - iqdqs: &mut InsertDeques, - ) -> Result<((bool, bool, bool),), Error> { + pub fn write(&mut self, item: ET, ts_net: Instant, iqdqs: &mut InsertDeques) -> Result { trace!("write {:?}", item.ts()); // TODO // Optimize for the common case that we only write into one of the stores. // Make the decision first, based on ref, then clone only as required. - let (did_write_st,) = Self::write_inner(&mut self.state_st, item.clone(), ts_net, &mut iqdqs.st_rf3_rx)?; - let (did_write_mt,) = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, &mut iqdqs.mt_rf3_rx)?; - let (did_write_lt,) = Self::write_inner(&mut self.state_lt, item, ts_net, &mut iqdqs.lt_rf3_rx)?; - Ok(((did_write_st, did_write_mt, did_write_lt),)) + let res_st = Self::write_inner(&mut self.state_st, item.clone(), ts_net, &mut iqdqs.st_rf3_rx)?; + let res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, &mut iqdqs.mt_rf3_rx)?; + let res_lt = Self::write_inner(&mut self.state_lt, item, ts_net, &mut iqdqs.lt_rf3_rx)?; + let ret = WriteRes { + st: WriteRtRes { + accept: res_st.accept, + bytes: res_st.bytes, + status: res_st.status, + }, + mt: WriteRtRes { + accept: res_mt.accept, + bytes: res_mt.bytes, + status: res_mt.status, + }, + lt: WriteRtRes { + accept: res_lt.accept, + bytes: res_lt.bytes, + status: res_lt.status, + }, + }; + Ok(ret) } fn write_inner( @@ -138,7 +163,7 @@ where item: ET, ts_net: Instant, deque: &mut VecDeque, - ) -> Result<(bool,), Error> { + ) -> Result { Ok(state.writer.write(item, ts_net, deque)?) } @@ -149,10 +174,3 @@ where Ok(()) } } - -#[derive(Debug)] -struct LastIns { - ts_local: TsNano, - ts_ioc: TsNano, - val: DataValue, -} diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index 57c85a3..f3ff336 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -1,42 +1,29 @@ -use async_channel::Sender; -use dbpg::seriesbychannel::ChannelInfoQuery; use err::thiserror; use err::ThisError; use log::*; -use netpod::timeunits::HOUR; -use netpod::timeunits::SEC; -use netpod::DtNano; -use netpod::ScalarType; -use netpod::SeriesKind; -use netpod::Shape; -use netpod::TsMs; use netpod::TsNano; -use scywr::insertqueues::InsertDeques; -use scywr::iteminsertqueue::DataValue; -use scywr::iteminsertqueue::InsertItem; use scywr::iteminsertqueue::QueryItem; -use series::ChannelStatusSeriesId; use series::SeriesId; use std::collections::VecDeque; use std::marker::PhantomData; use std::time::Instant; -use std::time::SystemTime; +use core::fmt; pub use smallvec::SmallVec; -pub trait EmittableType: ::core::fmt::Debug + Clone { +#[derive(Debug)] +pub struct EmitRes { + pub items: SmallVec<[QueryItem; 4]>, + pub bytes: u32, + pub status: u8, +} + +pub trait EmittableType: fmt::Debug + Clone { type State; fn ts(&self) -> TsNano; fn has_change(&self, k: &Self) -> bool; fn byte_size(&self) -> u32; - fn into_query_item( - self, - ts_msp: TsMs, - ts_msp_changed: bool, - ts_lsp: DtNano, - ts_net: Instant, - state: &mut ::State, - ) -> SmallVec<[QueryItem; 4]>; + fn into_query_item(self, ts_net: Instant, state: &mut ::State) -> EmitRes; } #[derive(Debug, ThisError)] @@ -65,16 +52,15 @@ impl From for Error { } } +#[derive(Debug)] +pub struct WriteRes { + pub bytes: u32, + pub status: u8, +} + #[derive(Debug)] pub struct SeriesWriter { sid: SeriesId, - ts_msp_last: Option, - inserted_in_current_msp: u32, - bytes_in_current_msp: u32, - msp_max_entries: u32, - msp_max_bytes: u32, - // TODO this should be in an Option: - ts_msp_grid_last: u32, _t1: PhantomData, } @@ -83,16 +69,7 @@ where ET: EmittableType, { pub fn new(sid: SeriesId) -> Result { - let res = Self { - sid, - ts_msp_last: None, - inserted_in_current_msp: 0, - bytes_in_current_msp: 0, - msp_max_entries: 64000, - msp_max_bytes: 1024 * 1024 * 20, - ts_msp_grid_last: 0, - _t1: PhantomData, - }; + let res = Self { sid, _t1: PhantomData }; Ok(res) } @@ -106,51 +83,18 @@ where state: &mut ::State, ts_net: Instant, deque: &mut VecDeque, - ) -> Result<(), Error> { + ) -> Result { let ts_main = item.ts(); - - // TODO decide on better msp/lsp: random offset! - // As long as one writer is active, the msp is arbitrary. - - // Maximum resolution of the ts msp: - let msp_res_max = SEC * 2; - - let (ts_msp, ts_msp_changed) = match self.ts_msp_last { - Some(ts_msp_last) => { - if self.inserted_in_current_msp >= self.msp_max_entries - || self.bytes_in_current_msp >= self.msp_max_bytes - || ts_msp_last.add_ns(HOUR) <= ts_main - { - let ts_msp = ts_main.div(msp_res_max).mul(msp_res_max); - if ts_msp == ts_msp_last { - (ts_msp, false) - } else { - self.ts_msp_last = Some(ts_msp); - self.inserted_in_current_msp = 1; - self.bytes_in_current_msp = item.byte_size(); - (ts_msp, true) - } - } else { - self.inserted_in_current_msp += 1; - self.bytes_in_current_msp += item.byte_size(); - (ts_msp_last, false) - } - } - None => { - let ts_msp = ts_main.div(msp_res_max).mul(msp_res_max); - self.ts_msp_last = Some(ts_msp); - self.inserted_in_current_msp = 1; - self.bytes_in_current_msp = item.byte_size(); - (ts_msp, true) - } - }; - let ts_lsp = ts_main.delta(ts_msp); - let items = item.into_query_item(ts_msp.to_ts_ms(), ts_msp_changed, ts_lsp, ts_net, state); - trace!("emit value for ts {:?} items len {}", ts_main, items.len()); - for item in items { + let res = item.into_query_item(ts_net, state); + trace!("emit value for ts {:?} items len {}", ts_main, res.items.len()); + for item in res.items { deque.push_back(item); } - Ok(()) + let res = WriteRes { + bytes: res.bytes, + status: res.status, + }; + Ok(res) } pub fn tick(&mut self, deque: &mut VecDeque) -> Result<(), Error> {