Refactor series write

This commit is contained in:
Dominik Werder
2024-07-17 16:29:47 +02:00
parent 9ac197e755
commit d0de644317
11 changed files with 265 additions and 114 deletions

View File

@@ -763,7 +763,7 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option<ChannelsConfig>) ->
debug!("will configure {} channels", channels_config.len());
let mut thr_msg = ThrottleTrace::new(Duration::from_millis(1000));
let mut i = 0;
let nmax = usize::MAX;
let nmax = 100999777;
let nn = channels_config.channels().len();
let mut ixs: Vec<usize> = (0..nn).into_iter().collect();
if false {

View File

@@ -26,6 +26,7 @@ use hashbrown::HashMap;
use log::*;
use netpod::timeunits::*;
use netpod::ttl::RetentionTime;
use netpod::DtNano;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
@@ -314,6 +315,7 @@ struct CreatingState {
struct MakingSeriesWriterState {
tsbeg: Instant,
channel: CreatedState,
series_status: SeriesId,
}
#[derive(Debug, Clone)]
@@ -514,6 +516,7 @@ enum ChannelState {
Init(ChannelStatusSeriesId),
Creating(CreatingState),
FetchEnumDetails(FetchEnumDetails),
FetchCaStatusSeries(MakingSeriesWriterState),
MakingSeriesWriter(MakingSeriesWriterState),
Writable(WritableState),
Closing(ClosingState),
@@ -551,6 +554,7 @@ impl ChannelState {
ChannelState::Init(..) => ChannelConnectedInfo::Disconnected,
ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting,
ChannelState::FetchEnumDetails(_) => ChannelConnectedInfo::Connecting,
ChannelState::FetchCaStatusSeries(_) => ChannelConnectedInfo::Connecting,
ChannelState::MakingSeriesWriter(_) => ChannelConnectedInfo::Connecting,
ChannelState::Writable(_) => ChannelConnectedInfo::Connected,
ChannelState::Error(_) => ChannelConnectedInfo::Error,
@@ -637,6 +641,7 @@ impl ChannelState {
ChannelState::Init(cssid) => cssid.clone(),
ChannelState::Creating(st) => st.cssid.clone(),
ChannelState::FetchEnumDetails(st) => st.cssid.clone(),
ChannelState::FetchCaStatusSeries(st) => st.channel.cssid.clone(),
ChannelState::MakingSeriesWriter(st) => st.channel.cssid.clone(),
ChannelState::Writable(st) => st.channel.cssid.clone(),
ChannelState::Error(e) => match e {
@@ -1183,20 +1188,51 @@ impl CaConn {
trace!("handle_writer_establish_result recv {}", self.remote_addr_dbg);
let chinfo = res?;
if let Some(ch) = self.channels.get(&cid) {
if let ChannelState::MakingSeriesWriter(st) = &ch.state {
let scalar_type = st.channel.scalar_type.clone();
let shape = st.channel.shape.clone();
let writer = RtWriter::new(
chinfo.series.to_series(),
scalar_type,
shape,
ch.conf.min_quiets(),
stnow,
)?;
self.handle_writer_establish_inner(cid, writer)?;
have_progress = true;
} else {
return Err(Error::Error);
match &ch.state {
ChannelState::FetchCaStatusSeries(st) => {
let crst = &st.channel;
let cid = crst.cid.clone();
let (tx, rx) = async_channel::bounded(8);
let item = ChannelInfoQuery {
backend: self.backend.clone(),
channel: crst.name().into(),
kind: netpod::SeriesKind::ChannelData,
scalar_type: crst.scalar_type.clone(),
shape: crst.shape.clone(),
tx: Box::pin(tx),
};
self.channel_info_query_qu.push_back(item);
self.channel_info_query_res_rxs.push_back((Box::pin(rx), cid));
self.channels.get_mut(&cid).unwrap().state =
ChannelState::MakingSeriesWriter(MakingSeriesWriterState {
tsbeg: Instant::now(),
channel: st.channel.clone(),
series_status: chinfo.series.to_series(),
});
have_progress = true;
}
ChannelState::MakingSeriesWriter(st) => {
let scalar_type = st.channel.scalar_type.clone();
let shape = st.channel.shape.clone();
let writer = RtWriter::new(
chinfo.series.to_series(),
scalar_type,
shape,
ch.conf.min_quiets(),
stnow,
&|| CaWriterValueState {
series_data: chinfo.series.to_series(),
series_status: st.series_status,
last_accepted_ts: TsNano::from_ns(0),
last_accepted_val: None,
},
)?;
self.handle_writer_establish_inner(cid, writer)?;
have_progress = true;
}
_ => {
return Err(Error::Error);
}
}
} else {
return Err(Error::Error);
@@ -1418,6 +1454,9 @@ impl CaConn {
ChannelState::FetchEnumDetails(st) => {
*chst = ChannelState::Ended(st.cssid.clone());
}
ChannelState::FetchCaStatusSeries(st) => {
*chst = ChannelState::Ended(st.channel.cssid.clone());
}
ChannelState::MakingSeriesWriter(st) => {
*chst = ChannelState::Ended(st.channel.cssid.clone());
}
@@ -1457,6 +1496,9 @@ impl CaConn {
ChannelState::Creating(..) => {
// TODO need last-save-ts for this state.
}
ChannelState::FetchCaStatusSeries(..) => {
// TODO ?
}
ChannelState::MakingSeriesWriter(..) => {
// TODO ?
}
@@ -1667,6 +1709,7 @@ impl CaConn {
ChannelState::Creating(_)
| ChannelState::Init(_)
| ChannelState::FetchEnumDetails(_)
| ChannelState::FetchCaStatusSeries(_)
| ChannelState::MakingSeriesWriter(_) => {
self.stats.recv_read_notify_but_not_init_yet.inc();
}
@@ -2090,6 +2133,7 @@ impl CaConn {
ChannelState::Init(_) => {}
ChannelState::Creating(_) => {}
ChannelState::FetchEnumDetails(_) => {}
ChannelState::FetchCaStatusSeries(_) => {}
ChannelState::MakingSeriesWriter(_) => {}
ChannelState::Writable(st2) => match &mut st2.reading {
ReadingState::EnableMonitoring(_) => {}
@@ -2493,8 +2537,7 @@ impl CaConn {
match &scalar_type {
ScalarType::Enum => {
// TODO channel created, now fetch enum variants, later make writer
let min_quiets = conf.conf.min_quiets();
let fut = enumfetch::EnumFetch::new(created_state, self, min_quiets);
let fut = enumfetch::EnumFetch::new(created_state, self);
// TODO should always check if the slot is free.
let ioid = fut.ioid();
let x = Box::pin(fut);
@@ -2503,10 +2546,6 @@ impl CaConn {
_ => {
let backend = self.backend.clone();
let channel_name = created_state.name().into();
*chst = ChannelState::MakingSeriesWriter(MakingSeriesWriterState {
tsbeg: tsnow,
channel: created_state,
});
// TODO create a channel for the answer.
// Keep only a certain max number of channels in-flight because have to poll on them.
// TODO register the channel for the answer.
@@ -2514,13 +2553,18 @@ impl CaConn {
let item = ChannelInfoQuery {
backend,
channel: channel_name,
kind: SeriesKind::ChannelData,
scalar_type: scalar_type.clone(),
shape: shape.clone(),
kind: SeriesKind::CaStatus,
scalar_type: ScalarType::I16,
shape: Shape::Scalar,
tx: Box::pin(tx),
};
self.channel_info_query_qu.push_back(item);
self.channel_info_query_res_rxs.push_back((Box::pin(rx), cid));
*chst = ChannelState::FetchCaStatusSeries(MakingSeriesWriterState {
tsbeg: tsnow,
channel: created_state,
series_status: SeriesId::new(0),
});
}
}
Ok(())
@@ -3237,6 +3281,13 @@ impl Stream for CaConn {
}
}
struct CaWriterValueState {
series_data: SeriesId,
series_status: SeriesId,
last_accepted_ts: TsNano,
last_accepted_val: Option<CaWriterValue>,
}
#[derive(Debug, Clone)]
struct CaWriterValue(CaEventValue, Option<String>);
@@ -3269,6 +3320,8 @@ impl CaWriterValue {
}
impl EmittableType for CaWriterValue {
type State = CaWriterValueState;
fn ts(&self) -> TsNano {
TsNano::from_ns(self.0.ts().unwrap_or(0))
}
@@ -3276,6 +3329,8 @@ impl EmittableType for CaWriterValue {
fn has_change(&self, k: &Self) -> bool {
if self.0.data != k.0.data {
true
} else if self.0.meta != k.0.meta {
true
} else {
false
}
@@ -3285,47 +3340,104 @@ impl EmittableType for CaWriterValue {
self.0.data.byte_size()
}
fn into_data_value(mut self) -> DataValue {
// TODO need to pass a ref to channel state to convert enum strings.
// Or do that already when we construct this?
// Also, in general, need to produce a SmallVec of values to emit: value, status, severity, etc..
// let val = Self::convert_event_data(crst, value.data)?;
use super::proto::CaDataValue;
use scywr::iteminsertqueue::DataValue;
let ret = match self.0.data {
CaDataValue::Scalar(val) => DataValue::Scalar({
use super::proto::CaDataScalarValue;
use scywr::iteminsertqueue::ScalarValue;
match val {
CaDataScalarValue::I8(x) => ScalarValue::I8(x),
CaDataScalarValue::I16(x) => ScalarValue::I16(x),
CaDataScalarValue::I32(x) => ScalarValue::I32(x),
CaDataScalarValue::F32(x) => ScalarValue::F32(x),
CaDataScalarValue::F64(x) => ScalarValue::F64(x),
CaDataScalarValue::Enum(x) => ScalarValue::Enum(
x,
self.1.take().unwrap_or_else(|| {
warn!("NoEnumStr");
String::from("NoEnumStr")
}),
),
CaDataScalarValue::String(x) => ScalarValue::String(x),
CaDataScalarValue::Bool(x) => ScalarValue::Bool(x),
}
}),
CaDataValue::Array(val) => DataValue::Array({
use super::proto::CaDataArrayValue;
use scywr::iteminsertqueue::ArrayValue;
match val {
CaDataArrayValue::I8(x) => ArrayValue::I8(x),
CaDataArrayValue::I16(x) => ArrayValue::I16(x),
CaDataArrayValue::I32(x) => ArrayValue::I32(x),
CaDataArrayValue::F32(x) => ArrayValue::F32(x),
CaDataArrayValue::F64(x) => ArrayValue::F64(x),
CaDataArrayValue::Bool(x) => ArrayValue::Bool(x),
}
}),
fn into_query_item(
mut self,
ts_msp: TsMs,
ts_msp_changed: bool,
ts_lsp: DtNano,
ts_net: Instant,
state: &mut <Self as EmittableType>::State,
) -> serieswriter::writer::SmallVec<[QueryItem; 4]> {
let mut ret = serieswriter::writer::SmallVec::new();
let diff_data = match state.last_accepted_val.as_ref() {
Some(last) => self.0.data != last.0.data,
None => true,
};
let diff_status = match state.last_accepted_val.as_ref() {
Some(last) => match &last.0.meta {
proto::CaMetaValue::CaMetaTime(last_meta) => match &self.0.meta {
proto::CaMetaValue::CaMetaTime(meta) => meta.status != last_meta.status,
_ => false,
},
_ => false,
},
None => true,
};
if let Some(ts) = self.0.ts() {
state.last_accepted_ts = TsNano::from_ns(ts);
}
state.last_accepted_val = Some(self.clone());
if diff_data {
debug!("diff_data emit {:?}", state.series_data);
let data_value = {
use super::proto::CaDataValue;
use scywr::iteminsertqueue::DataValue;
let ret = match self.0.data {
CaDataValue::Scalar(val) => DataValue::Scalar({
use super::proto::CaDataScalarValue;
use scywr::iteminsertqueue::ScalarValue;
match val {
CaDataScalarValue::I8(x) => ScalarValue::I8(x),
CaDataScalarValue::I16(x) => ScalarValue::I16(x),
CaDataScalarValue::I32(x) => ScalarValue::I32(x),
CaDataScalarValue::F32(x) => ScalarValue::F32(x),
CaDataScalarValue::F64(x) => ScalarValue::F64(x),
CaDataScalarValue::Enum(x) => ScalarValue::Enum(
x,
self.1.take().unwrap_or_else(|| {
warn!("NoEnumStr");
String::from("NoEnumStr")
}),
),
CaDataScalarValue::String(x) => ScalarValue::String(x),
CaDataScalarValue::Bool(x) => ScalarValue::Bool(x),
}
}),
CaDataValue::Array(val) => DataValue::Array({
use super::proto::CaDataArrayValue;
use scywr::iteminsertqueue::ArrayValue;
match val {
CaDataArrayValue::I8(x) => ArrayValue::I8(x),
CaDataArrayValue::I16(x) => ArrayValue::I16(x),
CaDataArrayValue::I32(x) => ArrayValue::I32(x),
CaDataArrayValue::F32(x) => ArrayValue::F32(x),
CaDataArrayValue::F64(x) => ArrayValue::F64(x),
CaDataArrayValue::Bool(x) => ArrayValue::Bool(x),
}
}),
};
ret
};
let item = scywriiq::InsertItem {
series: state.series_data.clone(),
ts_msp,
ts_lsp,
ts_net,
msp_bump: ts_msp_changed,
val: data_value,
};
ret.push(QueryItem::Insert(item));
}
if diff_status {
debug!("diff_status emit {:?}", state.series_status);
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::ScalarValue;
match self.0.meta {
proto::CaMetaValue::CaMetaTime(meta) => {
let data_value = DataValue::Scalar(ScalarValue::CaStatus(meta.status as i16));
let item = scywriiq::InsertItem {
series: state.series_status.clone(),
ts_msp,
ts_lsp,
ts_net,
msp_bump: ts_msp_changed,
val: data_value,
};
ret.push(QueryItem::Insert(item));
}
_ => {}
};
}
ret
}
}

View File

@@ -7,6 +7,7 @@ use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use log::*;
use series::SeriesId;
use std::pin::Pin;
use std::time::Instant;
@@ -23,11 +24,10 @@ pub trait ConnFuture: Send {
pub struct EnumFetch {
created_state: CreatedState,
ioid: Ioid,
min_quiets: serieswriter::rtwriter::MinQuiets,
}
impl EnumFetch {
pub fn new(created_state: CreatedState, conn: &mut CaConn, min_quiets: serieswriter::rtwriter::MinQuiets) -> Self {
pub fn new(created_state: CreatedState, conn: &mut CaConn) -> Self {
if created_state.cssid.id() == 4705698279895902114 {}
let name = created_state.name();
// info!("EnumFetch::new name {name}");
@@ -42,11 +42,7 @@ impl EnumFetch {
let ts = Instant::now();
let item = CaMsg::from_ty_ts(ty, ts);
conn.proto().unwrap().push_out(item);
Self {
created_state,
ioid,
min_quiets,
}
Self { created_state, ioid }
}
pub fn ioid(&self) -> Ioid {
@@ -76,16 +72,14 @@ impl ConnFuture for EnumFetch {
}
};
// TODO create a channel for the answer.
// TODO register the channel for the answer.
let cid = crst.cid.clone();
let (tx, rx) = async_channel::bounded(8);
let item = ChannelInfoQuery {
backend: conn.backend.clone(),
channel: crst.name().into(),
kind: netpod::SeriesKind::ChannelData,
scalar_type: crst.scalar_type.clone(),
shape: crst.shape.clone(),
kind: netpod::SeriesKind::CaStatus,
scalar_type: netpod::ScalarType::I16,
shape: netpod::Shape::Scalar,
tx: Box::pin(tx),
};
conn.channel_info_query_qu.push_back(item);
@@ -93,9 +87,10 @@ impl ConnFuture for EnumFetch {
// This handler must not exist if the channel gets removed.
let conf = conn.channels.get_mut(&crst.cid).ok_or(Error::MissingState)?;
conf.state = super::ChannelState::MakingSeriesWriter(super::MakingSeriesWriterState {
conf.state = super::ChannelState::FetchCaStatusSeries(super::MakingSeriesWriterState {
tsbeg: tsnow,
channel: crst.clone(),
series_status: SeriesId::new(0),
});
conn.handler_by_ioid.remove(&self.ioid);

View File

@@ -73,6 +73,8 @@ type ValueSeriesWriter = SeriesWriter<WritableType>;
struct WritableType(DataValue);
impl EmittableType for WritableType {
type State = ();
fn ts(&self) -> TsNano {
todo!()
}
@@ -85,7 +87,14 @@ impl EmittableType for WritableType {
todo!()
}
fn into_data_value(self) -> DataValue {
fn into_query_item(
self,
ts_msp: netpod::TsMs,
ts_msp_changed: bool,
ts_lsp: netpod::DtNano,
ts_net: Instant,
state: &mut <Self as EmittableType>::State,
) -> serieswriter::writer::SmallVec<[QueryItem; 4]> {
todo!()
}
}
@@ -314,11 +323,12 @@ where
let evs: EventsDim0<T> = evs.into();
trace_input!("see events {:?}", evs);
let tsnow = Instant::now();
let mut emit_state = ();
for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() {
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = f1(val);
writer.write(WritableType(val), tsnow, deque)?;
writer.write(WritableType(val), &mut emit_state, tsnow, deque)?;
}
Ok(())
}
@@ -341,11 +351,12 @@ where
let evs: EventsDim1<T> = evs.into();
trace_input!("see events {:?}", evs);
let tsnow = Instant::now();
let mut emit_state = ();
for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() {
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = f1(val);
writer.write(WritableType(val), tsnow, deque)?;
writer.write(WritableType(val), &mut emit_state, tsnow, deque)?;
}
Ok(())
}

View File

@@ -63,6 +63,7 @@ pub enum ScalarValue {
Enum(i16, String),
String(String),
Bool(bool),
CaStatus(i16),
}
impl ScalarValue {
@@ -81,6 +82,7 @@ impl ScalarValue {
ScalarValue::Enum(_, y) => 2 + y.len() as u32,
ScalarValue::String(x) => x.len() as u32,
ScalarValue::Bool(_) => 1,
ScalarValue::CaStatus(_) => 2,
}
}
@@ -99,6 +101,7 @@ impl ScalarValue {
ScalarValue::Enum(x, y) => format!("({}, {})", x, y),
ScalarValue::String(x) => x.to_string(),
ScalarValue::Bool(x) => x.to_string(),
ScalarValue::CaStatus(x) => x.to_string(),
}
}
}
@@ -313,7 +316,7 @@ impl DataValue {
}
}
pub fn scalar_type(&self) -> ScalarType {
fn unused_scalar_type(&self) -> ScalarType {
match self {
DataValue::Scalar(x) => match x {
ScalarValue::U8(_) => ScalarType::U8,
@@ -329,6 +332,7 @@ impl DataValue {
ScalarValue::Enum(..) => ScalarType::Enum,
ScalarValue::String(_) => ScalarType::STRING,
ScalarValue::Bool(_) => ScalarType::BOOL,
ScalarValue::CaStatus(_) => ScalarType::I16,
},
DataValue::Array(x) => match x {
ArrayValue::U8(_) => ScalarType::U8,
@@ -555,7 +559,6 @@ pub struct InsertItem {
pub msp_bump: bool,
pub val: DataValue,
pub ts_net: Instant,
pub ts_alt_1: TsNano,
}
impl InsertItem {
@@ -756,6 +759,7 @@ pub fn insert_item_fut(
}
String(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_string.clone(), scy),
Bool(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_bool.clone(), scy),
CaStatus(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_castatus.clone(), scy),
}
}
Array(val) => {

View File

@@ -468,7 +468,7 @@ async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySessio
let tab = GenTwcsTab::new(
keyspace,
rett.table_prefix(),
format!("events_scalar_status"),
format!("events_scalar_castatus"),
&[
("series", "bigint"),
("ts_msp", "bigint"),
@@ -485,7 +485,7 @@ async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySessio
let tab = GenTwcsTab::new(
keyspace,
rett.table_prefix(),
format!("events_scalar_severity"),
format!("events_scalar_caseverity"),
&[
("series", "bigint"),
("ts_msp", "bigint"),

View File

@@ -34,6 +34,7 @@ pub struct DataStore {
pub qu_insert_scalar_bool: Arc<PreparedStatement>,
pub qu_insert_scalar_string: Arc<PreparedStatement>,
pub qu_insert_scalar_enum: Arc<PreparedStatement>,
pub qu_insert_scalar_castatus: Arc<PreparedStatement>,
pub qu_insert_array_u8: Arc<PreparedStatement>,
pub qu_insert_array_u16: Arc<PreparedStatement>,
pub qu_insert_array_u32: Arc<PreparedStatement>,
@@ -58,8 +59,8 @@ macro_rules! prep_qu_ins_a {
($id1:expr, $rett:expr, $scy:expr) => {{
let cql = format!(
concat!(
"insert into {}{} (series, ts_msp, ts_lsp, pulse, value)",
" values (?, ?, ?, 0, ?)"
"insert into {}{} (series, ts_msp, ts_lsp, value)",
" values (?, ?, ?, ?)"
),
$rett.table_prefix(),
$id1
@@ -73,8 +74,8 @@ macro_rules! prep_qu_ins_b {
($id1:expr, $rett:expr, $scy:expr) => {{
let cql = format!(
concat!(
"insert into {}{} (series, ts_msp, ts_lsp, pulse, valueblob)",
" values (?, ?, ?, 0, ?)"
"insert into {}{} (series, ts_msp, ts_lsp, valueblob)",
" values (?, ?, ?, ?)"
),
$rett.table_prefix(),
$id1
@@ -139,6 +140,7 @@ impl DataStore {
let qu_insert_scalar_bool = prep_qu_ins_a!("events_scalar_bool", rett, scy);
let qu_insert_scalar_string = prep_qu_ins_a!("events_scalar_string", rett, scy);
let qu_insert_scalar_enum = prep_qu_ins_enum!("events_scalar_enum", rett, scy);
let qu_insert_scalar_castatus = prep_qu_ins_a!("events_scalar_castatus", rett, scy);
let qu_insert_array_u8 = prep_qu_ins_b!("events_array_u8", rett, scy);
let qu_insert_array_u16 = prep_qu_ins_b!("events_array_u16", rett, scy);
@@ -227,6 +229,7 @@ impl DataStore {
qu_insert_scalar_bool,
qu_insert_scalar_string,
qu_insert_scalar_enum,
qu_insert_scalar_castatus,
qu_insert_array_u8,
qu_insert_array_u16,
qu_insert_array_u32,

View File

@@ -8,6 +8,7 @@ edition = "2021"
serde = { version = "1.0", features = ["derive"] }
async-channel = "2.1.1"
futures-util = "0.3.30"
smallvec = "1.13.2"
log = { path = "../log" }
err = { path = "../../daqbuffer/crates/err" }
netpod = { path = "../../daqbuffer/crates/netpod" }

View File

@@ -28,9 +28,13 @@ pub enum Error {
SeriesWriter(#[from] crate::writer::Error),
}
pub struct RateLimitWriter<ET> {
pub struct RateLimitWriter<ET>
where
ET: EmittableType,
{
series: SeriesId,
min_quiet: Duration,
emit_state: <ET as EmittableType>::State,
last_insert_ts: TsNano,
last_insert_val: Option<ET>,
dbgname: String,
@@ -42,11 +46,17 @@ impl<ET> RateLimitWriter<ET>
where
ET: EmittableType,
{
pub fn new(series: SeriesId, min_quiet: Duration, dbgname: String) -> Result<Self, Error> {
pub fn new(
series: SeriesId,
min_quiet: Duration,
emit_state: <ET as EmittableType>::State,
dbgname: String,
) -> Result<Self, Error> {
let writer = SeriesWriter::new(series)?;
let ret = Self {
series,
min_quiet,
emit_state,
last_insert_ts: TsNano::from_ns(0),
last_insert_val: None,
dbgname,
@@ -90,9 +100,7 @@ where
}
};
if do_write {
self.last_insert_ts = item.ts();
self.last_insert_val = Some(item.clone());
self.writer.write(item, ts_net, deque)?;
self.writer.write(item, &mut self.emit_state, ts_net, deque)?;
}
Ok((do_write,))
}
@@ -105,7 +113,7 @@ where
impl<ET> fmt::Debug for RateLimitWriter<ET>
where
ET: fmt::Debug,
ET: EmittableType,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("RateLimitWriter")

View File

@@ -43,12 +43,18 @@ pub struct MinQuiets {
}
#[derive(Debug)]
struct State<ET> {
struct State<ET>
where
ET: EmittableType,
{
writer: RateLimitWriter<ET>,
}
#[derive(Debug)]
pub struct RtWriter<ET> {
pub struct RtWriter<ET>
where
ET: EmittableType,
{
series: SeriesId,
scalar_type: ScalarType,
shape: Shape,
@@ -68,18 +74,19 @@ where
shape: Shape,
min_quiets: MinQuiets,
stnow: SystemTime,
emit_state_new: &dyn Fn() -> <ET as EmittableType>::State,
) -> Result<Self, Error> {
let state_st = {
// let writer = SeriesWriter::establish_with_sid(sid, stnow)?;
let writer = RateLimitWriter::new(series, min_quiets.st, "st".into())?;
let writer = RateLimitWriter::new(series, min_quiets.st, emit_state_new(), "st".into())?;
State { writer }
};
let state_mt = {
let writer = RateLimitWriter::new(series, min_quiets.mt, "mt".into())?;
let writer = RateLimitWriter::new(series, min_quiets.mt, emit_state_new(), "mt".into())?;
State { writer }
};
let state_lt = {
let writer = RateLimitWriter::new(series, min_quiets.lt, "lt".into())?;
let writer = RateLimitWriter::new(series, min_quiets.lt, emit_state_new(), "lt".into())?;
State { writer }
};
let ret = Self {

View File

@@ -5,6 +5,7 @@ use err::ThisError;
use log::*;
use netpod::timeunits::HOUR;
use netpod::timeunits::SEC;
use netpod::DtNano;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
@@ -21,11 +22,21 @@ use std::marker::PhantomData;
use std::time::Instant;
use std::time::SystemTime;
pub trait EmittableType: Clone {
pub use smallvec::SmallVec;
pub trait EmittableType: ::core::fmt::Debug + Clone {
type State;
fn ts(&self) -> TsNano;
fn has_change(&self, k: &Self) -> bool;
fn byte_size(&self) -> u32;
fn into_data_value(self) -> DataValue;
fn into_query_item(
self,
ts_msp: TsMs,
ts_msp_changed: bool,
ts_lsp: DtNano,
ts_net: Instant,
state: &mut <Self as EmittableType>::State,
) -> SmallVec<[QueryItem; 4]>;
}
#[derive(Debug, ThisError)]
@@ -89,7 +100,13 @@ where
self.sid.clone()
}
pub fn write(&mut self, item: ET, ts_net: Instant, deque: &mut VecDeque<QueryItem>) -> Result<(), Error> {
pub fn write(
&mut self,
item: ET,
state: &mut <ET as EmittableType>::State,
ts_net: Instant,
deque: &mut VecDeque<QueryItem>,
) -> Result<(), Error> {
let ts_main = item.ts();
// TODO decide on better msp/lsp: random offset!
@@ -128,18 +145,11 @@ where
}
};
let ts_lsp = ts_main.delta(ts_msp);
let item = InsertItem {
series: self.sid.clone(),
ts_msp: ts_msp.to_ts_ms(),
ts_lsp,
ts_net,
ts_alt_1: ts_main,
msp_bump: ts_msp_changed,
val: item.into_data_value(),
};
// TODO decide on the path in the new deques struct
trace!("emit value for ts {:?}", ts_main);
deque.push_back(QueryItem::Insert(item));
let items = item.into_query_item(ts_msp.to_ts_ms(), ts_msp_changed, ts_lsp, ts_net, state);
trace!("emit value for ts {:?} items len {}", ts_main, items.len());
for item in items {
deque.push_back(item);
}
Ok(())
}