diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 29392d0..700985c 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -763,7 +763,7 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> debug!("will configure {} channels", channels_config.len()); let mut thr_msg = ThrottleTrace::new(Duration::from_millis(1000)); let mut i = 0; - let nmax = usize::MAX; + let nmax = 100999777; let nn = channels_config.channels().len(); let mut ixs: Vec = (0..nn).into_iter().collect(); if false { diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index eadc2b2..045d037 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -26,6 +26,7 @@ use hashbrown::HashMap; use log::*; use netpod::timeunits::*; use netpod::ttl::RetentionTime; +use netpod::DtNano; use netpod::ScalarType; use netpod::SeriesKind; use netpod::Shape; @@ -314,6 +315,7 @@ struct CreatingState { struct MakingSeriesWriterState { tsbeg: Instant, channel: CreatedState, + series_status: SeriesId, } #[derive(Debug, Clone)] @@ -514,6 +516,7 @@ enum ChannelState { Init(ChannelStatusSeriesId), Creating(CreatingState), FetchEnumDetails(FetchEnumDetails), + FetchCaStatusSeries(MakingSeriesWriterState), MakingSeriesWriter(MakingSeriesWriterState), Writable(WritableState), Closing(ClosingState), @@ -551,6 +554,7 @@ impl ChannelState { ChannelState::Init(..) => ChannelConnectedInfo::Disconnected, ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting, ChannelState::FetchEnumDetails(_) => ChannelConnectedInfo::Connecting, + ChannelState::FetchCaStatusSeries(_) => ChannelConnectedInfo::Connecting, ChannelState::MakingSeriesWriter(_) => ChannelConnectedInfo::Connecting, ChannelState::Writable(_) => ChannelConnectedInfo::Connected, ChannelState::Error(_) => ChannelConnectedInfo::Error, @@ -637,6 +641,7 @@ impl ChannelState { ChannelState::Init(cssid) => cssid.clone(), ChannelState::Creating(st) => st.cssid.clone(), ChannelState::FetchEnumDetails(st) => st.cssid.clone(), + ChannelState::FetchCaStatusSeries(st) => st.channel.cssid.clone(), ChannelState::MakingSeriesWriter(st) => st.channel.cssid.clone(), ChannelState::Writable(st) => st.channel.cssid.clone(), ChannelState::Error(e) => match e { @@ -1183,20 +1188,51 @@ impl CaConn { trace!("handle_writer_establish_result recv {}", self.remote_addr_dbg); let chinfo = res?; if let Some(ch) = self.channels.get(&cid) { - if let ChannelState::MakingSeriesWriter(st) = &ch.state { - let scalar_type = st.channel.scalar_type.clone(); - let shape = st.channel.shape.clone(); - let writer = RtWriter::new( - chinfo.series.to_series(), - scalar_type, - shape, - ch.conf.min_quiets(), - stnow, - )?; - self.handle_writer_establish_inner(cid, writer)?; - have_progress = true; - } else { - return Err(Error::Error); + match &ch.state { + ChannelState::FetchCaStatusSeries(st) => { + let crst = &st.channel; + let cid = crst.cid.clone(); + let (tx, rx) = async_channel::bounded(8); + let item = ChannelInfoQuery { + backend: self.backend.clone(), + channel: crst.name().into(), + kind: netpod::SeriesKind::ChannelData, + scalar_type: crst.scalar_type.clone(), + shape: crst.shape.clone(), + tx: Box::pin(tx), + }; + self.channel_info_query_qu.push_back(item); + self.channel_info_query_res_rxs.push_back((Box::pin(rx), cid)); + self.channels.get_mut(&cid).unwrap().state = + ChannelState::MakingSeriesWriter(MakingSeriesWriterState { + tsbeg: Instant::now(), + channel: st.channel.clone(), + series_status: chinfo.series.to_series(), + }); + have_progress = true; + } + ChannelState::MakingSeriesWriter(st) => { + let scalar_type = st.channel.scalar_type.clone(); + let shape = st.channel.shape.clone(); + let writer = RtWriter::new( + chinfo.series.to_series(), + scalar_type, + shape, + ch.conf.min_quiets(), + stnow, + &|| CaWriterValueState { + series_data: chinfo.series.to_series(), + series_status: st.series_status, + last_accepted_ts: TsNano::from_ns(0), + last_accepted_val: None, + }, + )?; + self.handle_writer_establish_inner(cid, writer)?; + have_progress = true; + } + _ => { + return Err(Error::Error); + } } } else { return Err(Error::Error); @@ -1418,6 +1454,9 @@ impl CaConn { ChannelState::FetchEnumDetails(st) => { *chst = ChannelState::Ended(st.cssid.clone()); } + ChannelState::FetchCaStatusSeries(st) => { + *chst = ChannelState::Ended(st.channel.cssid.clone()); + } ChannelState::MakingSeriesWriter(st) => { *chst = ChannelState::Ended(st.channel.cssid.clone()); } @@ -1457,6 +1496,9 @@ impl CaConn { ChannelState::Creating(..) => { // TODO need last-save-ts for this state. } + ChannelState::FetchCaStatusSeries(..) => { + // TODO ? + } ChannelState::MakingSeriesWriter(..) => { // TODO ? } @@ -1667,6 +1709,7 @@ impl CaConn { ChannelState::Creating(_) | ChannelState::Init(_) | ChannelState::FetchEnumDetails(_) + | ChannelState::FetchCaStatusSeries(_) | ChannelState::MakingSeriesWriter(_) => { self.stats.recv_read_notify_but_not_init_yet.inc(); } @@ -2090,6 +2133,7 @@ impl CaConn { ChannelState::Init(_) => {} ChannelState::Creating(_) => {} ChannelState::FetchEnumDetails(_) => {} + ChannelState::FetchCaStatusSeries(_) => {} ChannelState::MakingSeriesWriter(_) => {} ChannelState::Writable(st2) => match &mut st2.reading { ReadingState::EnableMonitoring(_) => {} @@ -2493,8 +2537,7 @@ impl CaConn { match &scalar_type { ScalarType::Enum => { // TODO channel created, now fetch enum variants, later make writer - let min_quiets = conf.conf.min_quiets(); - let fut = enumfetch::EnumFetch::new(created_state, self, min_quiets); + let fut = enumfetch::EnumFetch::new(created_state, self); // TODO should always check if the slot is free. let ioid = fut.ioid(); let x = Box::pin(fut); @@ -2503,10 +2546,6 @@ impl CaConn { _ => { let backend = self.backend.clone(); let channel_name = created_state.name().into(); - *chst = ChannelState::MakingSeriesWriter(MakingSeriesWriterState { - tsbeg: tsnow, - channel: created_state, - }); // TODO create a channel for the answer. // Keep only a certain max number of channels in-flight because have to poll on them. // TODO register the channel for the answer. @@ -2514,13 +2553,18 @@ impl CaConn { let item = ChannelInfoQuery { backend, channel: channel_name, - kind: SeriesKind::ChannelData, - scalar_type: scalar_type.clone(), - shape: shape.clone(), + kind: SeriesKind::CaStatus, + scalar_type: ScalarType::I16, + shape: Shape::Scalar, tx: Box::pin(tx), }; self.channel_info_query_qu.push_back(item); self.channel_info_query_res_rxs.push_back((Box::pin(rx), cid)); + *chst = ChannelState::FetchCaStatusSeries(MakingSeriesWriterState { + tsbeg: tsnow, + channel: created_state, + series_status: SeriesId::new(0), + }); } } Ok(()) @@ -3237,6 +3281,13 @@ impl Stream for CaConn { } } +struct CaWriterValueState { + series_data: SeriesId, + series_status: SeriesId, + last_accepted_ts: TsNano, + last_accepted_val: Option, +} + #[derive(Debug, Clone)] struct CaWriterValue(CaEventValue, Option); @@ -3269,6 +3320,8 @@ impl CaWriterValue { } impl EmittableType for CaWriterValue { + type State = CaWriterValueState; + fn ts(&self) -> TsNano { TsNano::from_ns(self.0.ts().unwrap_or(0)) } @@ -3276,6 +3329,8 @@ impl EmittableType for CaWriterValue { fn has_change(&self, k: &Self) -> bool { if self.0.data != k.0.data { true + } else if self.0.meta != k.0.meta { + true } else { false } @@ -3285,47 +3340,104 @@ impl EmittableType for CaWriterValue { self.0.data.byte_size() } - fn into_data_value(mut self) -> DataValue { - // TODO need to pass a ref to channel state to convert enum strings. - // Or do that already when we construct this? - // Also, in general, need to produce a SmallVec of values to emit: value, status, severity, etc.. - // let val = Self::convert_event_data(crst, value.data)?; - use super::proto::CaDataValue; - use scywr::iteminsertqueue::DataValue; - let ret = match self.0.data { - CaDataValue::Scalar(val) => DataValue::Scalar({ - use super::proto::CaDataScalarValue; - use scywr::iteminsertqueue::ScalarValue; - match val { - CaDataScalarValue::I8(x) => ScalarValue::I8(x), - CaDataScalarValue::I16(x) => ScalarValue::I16(x), - CaDataScalarValue::I32(x) => ScalarValue::I32(x), - CaDataScalarValue::F32(x) => ScalarValue::F32(x), - CaDataScalarValue::F64(x) => ScalarValue::F64(x), - CaDataScalarValue::Enum(x) => ScalarValue::Enum( - x, - self.1.take().unwrap_or_else(|| { - warn!("NoEnumStr"); - String::from("NoEnumStr") - }), - ), - CaDataScalarValue::String(x) => ScalarValue::String(x), - CaDataScalarValue::Bool(x) => ScalarValue::Bool(x), - } - }), - CaDataValue::Array(val) => DataValue::Array({ - use super::proto::CaDataArrayValue; - use scywr::iteminsertqueue::ArrayValue; - match val { - CaDataArrayValue::I8(x) => ArrayValue::I8(x), - CaDataArrayValue::I16(x) => ArrayValue::I16(x), - CaDataArrayValue::I32(x) => ArrayValue::I32(x), - CaDataArrayValue::F32(x) => ArrayValue::F32(x), - CaDataArrayValue::F64(x) => ArrayValue::F64(x), - CaDataArrayValue::Bool(x) => ArrayValue::Bool(x), - } - }), + fn into_query_item( + mut self, + ts_msp: TsMs, + ts_msp_changed: bool, + ts_lsp: DtNano, + ts_net: Instant, + state: &mut ::State, + ) -> serieswriter::writer::SmallVec<[QueryItem; 4]> { + let mut ret = serieswriter::writer::SmallVec::new(); + let diff_data = match state.last_accepted_val.as_ref() { + Some(last) => self.0.data != last.0.data, + None => true, }; + let diff_status = match state.last_accepted_val.as_ref() { + Some(last) => match &last.0.meta { + proto::CaMetaValue::CaMetaTime(last_meta) => match &self.0.meta { + proto::CaMetaValue::CaMetaTime(meta) => meta.status != last_meta.status, + _ => false, + }, + _ => false, + }, + None => true, + }; + if let Some(ts) = self.0.ts() { + state.last_accepted_ts = TsNano::from_ns(ts); + } + state.last_accepted_val = Some(self.clone()); + if diff_data { + debug!("diff_data emit {:?}", state.series_data); + let data_value = { + use super::proto::CaDataValue; + use scywr::iteminsertqueue::DataValue; + let ret = match self.0.data { + CaDataValue::Scalar(val) => DataValue::Scalar({ + use super::proto::CaDataScalarValue; + use scywr::iteminsertqueue::ScalarValue; + match val { + CaDataScalarValue::I8(x) => ScalarValue::I8(x), + CaDataScalarValue::I16(x) => ScalarValue::I16(x), + CaDataScalarValue::I32(x) => ScalarValue::I32(x), + CaDataScalarValue::F32(x) => ScalarValue::F32(x), + CaDataScalarValue::F64(x) => ScalarValue::F64(x), + CaDataScalarValue::Enum(x) => ScalarValue::Enum( + x, + self.1.take().unwrap_or_else(|| { + warn!("NoEnumStr"); + String::from("NoEnumStr") + }), + ), + CaDataScalarValue::String(x) => ScalarValue::String(x), + CaDataScalarValue::Bool(x) => ScalarValue::Bool(x), + } + }), + CaDataValue::Array(val) => DataValue::Array({ + use super::proto::CaDataArrayValue; + use scywr::iteminsertqueue::ArrayValue; + match val { + CaDataArrayValue::I8(x) => ArrayValue::I8(x), + CaDataArrayValue::I16(x) => ArrayValue::I16(x), + CaDataArrayValue::I32(x) => ArrayValue::I32(x), + CaDataArrayValue::F32(x) => ArrayValue::F32(x), + CaDataArrayValue::F64(x) => ArrayValue::F64(x), + CaDataArrayValue::Bool(x) => ArrayValue::Bool(x), + } + }), + }; + ret + }; + let item = scywriiq::InsertItem { + series: state.series_data.clone(), + ts_msp, + ts_lsp, + ts_net, + msp_bump: ts_msp_changed, + val: data_value, + }; + ret.push(QueryItem::Insert(item)); + } + if diff_status { + debug!("diff_status emit {:?}", state.series_status); + use scywr::iteminsertqueue::DataValue; + use scywr::iteminsertqueue::ScalarValue; + match self.0.meta { + proto::CaMetaValue::CaMetaTime(meta) => { + let data_value = DataValue::Scalar(ScalarValue::CaStatus(meta.status as i16)); + let item = scywriiq::InsertItem { + series: state.series_status.clone(), + ts_msp, + ts_lsp, + ts_net, + msp_bump: ts_msp_changed, + val: data_value, + }; + ret.push(QueryItem::Insert(item)); + } + _ => {} + }; + } ret } } diff --git a/netfetch/src/ca/conn/enumfetch.rs b/netfetch/src/ca/conn/enumfetch.rs index 46c6f77..33e57df 100644 --- a/netfetch/src/ca/conn/enumfetch.rs +++ b/netfetch/src/ca/conn/enumfetch.rs @@ -7,6 +7,7 @@ use dbpg::seriesbychannel::ChannelInfoQuery; use err::thiserror; use err::ThisError; use log::*; +use series::SeriesId; use std::pin::Pin; use std::time::Instant; @@ -23,11 +24,10 @@ pub trait ConnFuture: Send { pub struct EnumFetch { created_state: CreatedState, ioid: Ioid, - min_quiets: serieswriter::rtwriter::MinQuiets, } impl EnumFetch { - pub fn new(created_state: CreatedState, conn: &mut CaConn, min_quiets: serieswriter::rtwriter::MinQuiets) -> Self { + pub fn new(created_state: CreatedState, conn: &mut CaConn) -> Self { if created_state.cssid.id() == 4705698279895902114 {} let name = created_state.name(); // info!("EnumFetch::new name {name}"); @@ -42,11 +42,7 @@ impl EnumFetch { let ts = Instant::now(); let item = CaMsg::from_ty_ts(ty, ts); conn.proto().unwrap().push_out(item); - Self { - created_state, - ioid, - min_quiets, - } + Self { created_state, ioid } } pub fn ioid(&self) -> Ioid { @@ -76,16 +72,14 @@ impl ConnFuture for EnumFetch { } }; - // TODO create a channel for the answer. - // TODO register the channel for the answer. let cid = crst.cid.clone(); let (tx, rx) = async_channel::bounded(8); let item = ChannelInfoQuery { backend: conn.backend.clone(), channel: crst.name().into(), - kind: netpod::SeriesKind::ChannelData, - scalar_type: crst.scalar_type.clone(), - shape: crst.shape.clone(), + kind: netpod::SeriesKind::CaStatus, + scalar_type: netpod::ScalarType::I16, + shape: netpod::Shape::Scalar, tx: Box::pin(tx), }; conn.channel_info_query_qu.push_back(item); @@ -93,9 +87,10 @@ impl ConnFuture for EnumFetch { // This handler must not exist if the channel gets removed. let conf = conn.channels.get_mut(&crst.cid).ok_or(Error::MissingState)?; - conf.state = super::ChannelState::MakingSeriesWriter(super::MakingSeriesWriterState { + conf.state = super::ChannelState::FetchCaStatusSeries(super::MakingSeriesWriterState { tsbeg: tsnow, channel: crst.clone(), + series_status: SeriesId::new(0), }); conn.handler_by_ioid.remove(&self.ioid); diff --git a/netfetch/src/metrics/ingest.rs b/netfetch/src/metrics/ingest.rs index a04700e..e460279 100644 --- a/netfetch/src/metrics/ingest.rs +++ b/netfetch/src/metrics/ingest.rs @@ -73,6 +73,8 @@ type ValueSeriesWriter = SeriesWriter; struct WritableType(DataValue); impl EmittableType for WritableType { + type State = (); + fn ts(&self) -> TsNano { todo!() } @@ -85,7 +87,14 @@ impl EmittableType for WritableType { todo!() } - fn into_data_value(self) -> DataValue { + fn into_query_item( + self, + ts_msp: netpod::TsMs, + ts_msp_changed: bool, + ts_lsp: netpod::DtNano, + ts_net: Instant, + state: &mut ::State, + ) -> serieswriter::writer::SmallVec<[QueryItem; 4]> { todo!() } } @@ -314,11 +323,12 @@ where let evs: EventsDim0 = evs.into(); trace_input!("see events {:?}", evs); let tsnow = Instant::now(); + let mut emit_state = (); for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() { let val = val.clone(); trace_input!("ev {:6} {:20} {:20?}", i, ts, val); let val = f1(val); - writer.write(WritableType(val), tsnow, deque)?; + writer.write(WritableType(val), &mut emit_state, tsnow, deque)?; } Ok(()) } @@ -341,11 +351,12 @@ where let evs: EventsDim1 = evs.into(); trace_input!("see events {:?}", evs); let tsnow = Instant::now(); + let mut emit_state = (); for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() { let val = val.clone(); trace_input!("ev {:6} {:20} {:20?}", i, ts, val); let val = f1(val); - writer.write(WritableType(val), tsnow, deque)?; + writer.write(WritableType(val), &mut emit_state, tsnow, deque)?; } Ok(()) } diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 888242e..88b2e39 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -63,6 +63,7 @@ pub enum ScalarValue { Enum(i16, String), String(String), Bool(bool), + CaStatus(i16), } impl ScalarValue { @@ -81,6 +82,7 @@ impl ScalarValue { ScalarValue::Enum(_, y) => 2 + y.len() as u32, ScalarValue::String(x) => x.len() as u32, ScalarValue::Bool(_) => 1, + ScalarValue::CaStatus(_) => 2, } } @@ -99,6 +101,7 @@ impl ScalarValue { ScalarValue::Enum(x, y) => format!("({}, {})", x, y), ScalarValue::String(x) => x.to_string(), ScalarValue::Bool(x) => x.to_string(), + ScalarValue::CaStatus(x) => x.to_string(), } } } @@ -313,7 +316,7 @@ impl DataValue { } } - pub fn scalar_type(&self) -> ScalarType { + fn unused_scalar_type(&self) -> ScalarType { match self { DataValue::Scalar(x) => match x { ScalarValue::U8(_) => ScalarType::U8, @@ -329,6 +332,7 @@ impl DataValue { ScalarValue::Enum(..) => ScalarType::Enum, ScalarValue::String(_) => ScalarType::STRING, ScalarValue::Bool(_) => ScalarType::BOOL, + ScalarValue::CaStatus(_) => ScalarType::I16, }, DataValue::Array(x) => match x { ArrayValue::U8(_) => ScalarType::U8, @@ -555,7 +559,6 @@ pub struct InsertItem { pub msp_bump: bool, pub val: DataValue, pub ts_net: Instant, - pub ts_alt_1: TsNano, } impl InsertItem { @@ -756,6 +759,7 @@ pub fn insert_item_fut( } String(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_string.clone(), scy), Bool(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_bool.clone(), scy), + CaStatus(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_castatus.clone(), scy), } } Array(val) => { diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 5190f95..34b5134 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -468,7 +468,7 @@ async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySessio let tab = GenTwcsTab::new( keyspace, rett.table_prefix(), - format!("events_scalar_status"), + format!("events_scalar_castatus"), &[ ("series", "bigint"), ("ts_msp", "bigint"), @@ -485,7 +485,7 @@ async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySessio let tab = GenTwcsTab::new( keyspace, rett.table_prefix(), - format!("events_scalar_severity"), + format!("events_scalar_caseverity"), &[ ("series", "bigint"), ("ts_msp", "bigint"), diff --git a/scywr/src/store.rs b/scywr/src/store.rs index 4e015d5..d70a40f 100644 --- a/scywr/src/store.rs +++ b/scywr/src/store.rs @@ -34,6 +34,7 @@ pub struct DataStore { pub qu_insert_scalar_bool: Arc, pub qu_insert_scalar_string: Arc, pub qu_insert_scalar_enum: Arc, + pub qu_insert_scalar_castatus: Arc, pub qu_insert_array_u8: Arc, pub qu_insert_array_u16: Arc, pub qu_insert_array_u32: Arc, @@ -58,8 +59,8 @@ macro_rules! prep_qu_ins_a { ($id1:expr, $rett:expr, $scy:expr) => {{ let cql = format!( concat!( - "insert into {}{} (series, ts_msp, ts_lsp, pulse, value)", - " values (?, ?, ?, 0, ?)" + "insert into {}{} (series, ts_msp, ts_lsp, value)", + " values (?, ?, ?, ?)" ), $rett.table_prefix(), $id1 @@ -73,8 +74,8 @@ macro_rules! prep_qu_ins_b { ($id1:expr, $rett:expr, $scy:expr) => {{ let cql = format!( concat!( - "insert into {}{} (series, ts_msp, ts_lsp, pulse, valueblob)", - " values (?, ?, ?, 0, ?)" + "insert into {}{} (series, ts_msp, ts_lsp, valueblob)", + " values (?, ?, ?, ?)" ), $rett.table_prefix(), $id1 @@ -139,6 +140,7 @@ impl DataStore { let qu_insert_scalar_bool = prep_qu_ins_a!("events_scalar_bool", rett, scy); let qu_insert_scalar_string = prep_qu_ins_a!("events_scalar_string", rett, scy); let qu_insert_scalar_enum = prep_qu_ins_enum!("events_scalar_enum", rett, scy); + let qu_insert_scalar_castatus = prep_qu_ins_a!("events_scalar_castatus", rett, scy); let qu_insert_array_u8 = prep_qu_ins_b!("events_array_u8", rett, scy); let qu_insert_array_u16 = prep_qu_ins_b!("events_array_u16", rett, scy); @@ -227,6 +229,7 @@ impl DataStore { qu_insert_scalar_bool, qu_insert_scalar_string, qu_insert_scalar_enum, + qu_insert_scalar_castatus, qu_insert_array_u8, qu_insert_array_u16, qu_insert_array_u32, diff --git a/serieswriter/Cargo.toml b/serieswriter/Cargo.toml index f6496c0..927db6e 100644 --- a/serieswriter/Cargo.toml +++ b/serieswriter/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" serde = { version = "1.0", features = ["derive"] } async-channel = "2.1.1" futures-util = "0.3.30" +smallvec = "1.13.2" log = { path = "../log" } err = { path = "../../daqbuffer/crates/err" } netpod = { path = "../../daqbuffer/crates/netpod" } diff --git a/serieswriter/src/ratelimitwriter.rs b/serieswriter/src/ratelimitwriter.rs index 082da6b..cb52372 100644 --- a/serieswriter/src/ratelimitwriter.rs +++ b/serieswriter/src/ratelimitwriter.rs @@ -28,9 +28,13 @@ pub enum Error { SeriesWriter(#[from] crate::writer::Error), } -pub struct RateLimitWriter { +pub struct RateLimitWriter +where + ET: EmittableType, +{ series: SeriesId, min_quiet: Duration, + emit_state: ::State, last_insert_ts: TsNano, last_insert_val: Option, dbgname: String, @@ -42,11 +46,17 @@ impl RateLimitWriter where ET: EmittableType, { - pub fn new(series: SeriesId, min_quiet: Duration, dbgname: String) -> Result { + pub fn new( + series: SeriesId, + min_quiet: Duration, + emit_state: ::State, + dbgname: String, + ) -> Result { let writer = SeriesWriter::new(series)?; let ret = Self { series, min_quiet, + emit_state, last_insert_ts: TsNano::from_ns(0), last_insert_val: None, dbgname, @@ -90,9 +100,7 @@ where } }; if do_write { - self.last_insert_ts = item.ts(); - self.last_insert_val = Some(item.clone()); - self.writer.write(item, ts_net, deque)?; + self.writer.write(item, &mut self.emit_state, ts_net, deque)?; } Ok((do_write,)) } @@ -105,7 +113,7 @@ where impl fmt::Debug for RateLimitWriter where - ET: fmt::Debug, + ET: EmittableType, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("RateLimitWriter") diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index 8d6d520..498bc88 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -43,12 +43,18 @@ pub struct MinQuiets { } #[derive(Debug)] -struct State { +struct State +where + ET: EmittableType, +{ writer: RateLimitWriter, } #[derive(Debug)] -pub struct RtWriter { +pub struct RtWriter +where + ET: EmittableType, +{ series: SeriesId, scalar_type: ScalarType, shape: Shape, @@ -68,18 +74,19 @@ where shape: Shape, min_quiets: MinQuiets, stnow: SystemTime, + emit_state_new: &dyn Fn() -> ::State, ) -> Result { let state_st = { // let writer = SeriesWriter::establish_with_sid(sid, stnow)?; - let writer = RateLimitWriter::new(series, min_quiets.st, "st".into())?; + let writer = RateLimitWriter::new(series, min_quiets.st, emit_state_new(), "st".into())?; State { writer } }; let state_mt = { - let writer = RateLimitWriter::new(series, min_quiets.mt, "mt".into())?; + let writer = RateLimitWriter::new(series, min_quiets.mt, emit_state_new(), "mt".into())?; State { writer } }; let state_lt = { - let writer = RateLimitWriter::new(series, min_quiets.lt, "lt".into())?; + let writer = RateLimitWriter::new(series, min_quiets.lt, emit_state_new(), "lt".into())?; State { writer } }; let ret = Self { diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index 46a6228..57c85a3 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -5,6 +5,7 @@ use err::ThisError; use log::*; use netpod::timeunits::HOUR; use netpod::timeunits::SEC; +use netpod::DtNano; use netpod::ScalarType; use netpod::SeriesKind; use netpod::Shape; @@ -21,11 +22,21 @@ use std::marker::PhantomData; use std::time::Instant; use std::time::SystemTime; -pub trait EmittableType: Clone { +pub use smallvec::SmallVec; + +pub trait EmittableType: ::core::fmt::Debug + Clone { + type State; fn ts(&self) -> TsNano; fn has_change(&self, k: &Self) -> bool; fn byte_size(&self) -> u32; - fn into_data_value(self) -> DataValue; + fn into_query_item( + self, + ts_msp: TsMs, + ts_msp_changed: bool, + ts_lsp: DtNano, + ts_net: Instant, + state: &mut ::State, + ) -> SmallVec<[QueryItem; 4]>; } #[derive(Debug, ThisError)] @@ -89,7 +100,13 @@ where self.sid.clone() } - pub fn write(&mut self, item: ET, ts_net: Instant, deque: &mut VecDeque) -> Result<(), Error> { + pub fn write( + &mut self, + item: ET, + state: &mut ::State, + ts_net: Instant, + deque: &mut VecDeque, + ) -> Result<(), Error> { let ts_main = item.ts(); // TODO decide on better msp/lsp: random offset! @@ -128,18 +145,11 @@ where } }; let ts_lsp = ts_main.delta(ts_msp); - let item = InsertItem { - series: self.sid.clone(), - ts_msp: ts_msp.to_ts_ms(), - ts_lsp, - ts_net, - ts_alt_1: ts_main, - msp_bump: ts_msp_changed, - val: item.into_data_value(), - }; - // TODO decide on the path in the new deques struct - trace!("emit value for ts {:?}", ts_main); - deque.push_back(QueryItem::Insert(item)); + let items = item.into_query_item(ts_msp.to_ts_ms(), ts_msp_changed, ts_lsp, ts_net, state); + trace!("emit value for ts {:?} items len {}", ts_main, items.len()); + for item in items { + deque.push_back(item); + } Ok(()) }