From 2f20b491933c2dc823ddc1041c1c593c6bbf1f39 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 1 Jul 2025 17:00:35 +0200 Subject: [PATCH] WIP begin status channel changes --- daqingest/Cargo.toml | 2 +- netfetch/src/ca/conn.rs | 264 ++++++++++++----------- netfetch/src/metrics/ingest.rs | 83 +++---- netfetch/src/metrics/ingest/write_v02.rs | 41 ++-- serieswriter/src/fixgridwriter.rs | 46 ++-- serieswriter/src/ratelimitwriter.rs | 15 +- serieswriter/src/rtwriter.rs | 19 +- serieswriter/src/writer.rs | 43 ++-- 8 files changed, 258 insertions(+), 255 deletions(-) diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 749e8e2..07b38f3 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.3.0-aa.8" +version = "0.3.0-aa.9" authors = ["Dominik Werder "] edition = "2024" diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 8db0f3c..ba6b6ca 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -16,6 +16,7 @@ use futures_util::Stream; use futures_util::StreamExt; use hashbrown::HashMap; use log::*; +use netpod::ByteSize; use netpod::EMIT_ACCOUNTING_SNAP; use netpod::ScalarType; use netpod::SeriesKind; @@ -478,7 +479,6 @@ struct CreatedState { shape: Shape, name: String, enum_str_table: Option>, - status_emit_count: u64, #[serde(with = "serde_Instant_elapsed_ms")] ts_recv_value_status_emit_next: Instant, } @@ -518,7 +518,6 @@ impl CreatedState { shape: Shape::Scalar, name: String::new(), enum_str_table: None, - status_emit_count: 0, ts_recv_value_status_emit_next: Instant::now(), } } @@ -647,7 +646,8 @@ impl ChannelState { }; let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10); let status_emit_count = match self { - ChannelState::Writable(s) => s.channel.status_emit_count, + // TODO + // ChannelState::Writable(s) => s.channel.status_emit_count, _ => 0, }; let last_comparisons = match self { @@ -1318,6 +1318,17 @@ impl CaConn { if self.emit_connection_status_item(item).is_err() { self.mett.logic_error().inc(); } + let cids: Vec<_> = self.channels.keys().map(Clone::clone).collect(); + for cid in cids { + match self.channel_close_by_cid(cid) { + Err(e) => { + // TODO + self.mett.logic_error().inc(); + } + Ok(()) => {} + } + } + // TODO should let protocol shut down properly self.proto = None; } @@ -1328,7 +1339,7 @@ impl CaConn { } fn cmd_channel_close(&mut self, name: String) { - self.channel_close(name); + self.channel_close_by_name(name); // TODO return the result } @@ -1629,18 +1640,32 @@ impl CaConn { } } - pub fn channel_close(&mut self, name: String) { - debug!("channel_close {}", name); + fn channel_close_by_name(&mut self, name: String) -> Result<(), Error> { + let selfname = "channel_close_by_name"; + debug!("{selfname} {}", name); + if let Some(x) = self.cid_by_name.get(&name).map(Clone::clone) { + self.cid_by_name.remove(&name); + self.channel_close_by_cid(x.clone()) + } else { + warn!("{selfname} {} can not find channel", name); + // TODO should return error? + Ok(()) + } + } + + fn channel_close_by_cid(&mut self, cid: Cid) -> Result<(), Error> { + let selfname = "channel_close_by_cid"; let tsnow = Instant::now(); let stnow = SystemTime::now(); - let cid = if let Some(x) = self.cid_by_name.get(&name) { - x.clone() - } else { - debug!("channel_close {} can not find channel", name); - return; - }; - self.cid_by_name.remove(&name); if let Some(conf) = self.channels.get_mut(&cid) { + let name = conf.conf.name(); + { + // TODO emit CaConn item to let CaConnSet know that we have closed the channel. + // TODO may be too full + let value = CaConnEventValue::ChannelRemoved(name.into()); + let item = CaConnEvent::new_now(value); + self.ca_conn_event_out_queue.push_back(item); + } let item = ChannelStatusItem { ts: stnow, cssid: conf.state.cssid(), @@ -1651,6 +1676,14 @@ impl CaConn { self.mett.logic_error().inc(); } // TODO shutdown the internal writer structures. + match &mut conf.state { + ChannelState::Writable(st2) => { + if st2.writer.on_close(&mut self.iqdqs).is_err() { + self.mett.logic_error().inc(); + } + } + _ => {} + } if let Some(cst) = conf.state.created_state() { if let Some(proto) = self.proto.as_mut() { let ty = CaMsgTy::ChannelClose(ChannelClose { @@ -1680,25 +1713,22 @@ impl CaConn { }; } } else { - debug!("channel_close {} no channel block", name); - }; + debug!("{selfname} {} not found", cid); + } { let it = self.cid_by_sid.extract_if(|_, v| *v == cid); it.count(); } self.channels.remove(&cid); - // TODO emit CaConn item to let CaConnSet know that we have closed the channel. - // TODO may be too full - let value = CaConnEventValue::ChannelRemoved(name); - let item = CaConnEvent::new_now(value); - self.ca_conn_event_out_queue.push_back(item); + Ok(()) } fn channel_remove_by_name(&mut self, name: String) { + let selfname = "channel_remove_by_name"; if let Some(cid) = self.cid_by_name(&name) { self.channel_remove_by_cid(cid); } else { - warn!("channel_remove does not exist {}", name); + warn!("{selfname} does not exist {}", name); } } @@ -2396,7 +2426,6 @@ impl CaConn { crst.insert_item_ivl_ema.tick(tsnow); let val_for_agg = value.f32_for_binning(); let wres = writer.write(CaWriterValue::new(value, crst), tscaproto, tsev, iqdqs)?; - crst.status_emit_count += wres.nstatus() as u64; if wres.st.accept { crst.dw_st_last = stnow; crst.acc_st.push_written(payload_len); @@ -3036,7 +3065,6 @@ impl CaConn { shape: shape.clone(), name: conf.conf.name().into(), enum_str_table: None, - status_emit_count: 0, ts_recv_value_status_emit_next: Instant::now(), }; if series::dbg::dbg_chn(created_state.name()) { @@ -3977,118 +4005,92 @@ impl EmittableType for CaWriterValue { tsev: TsNano, state: &mut ::State, ) -> serieswriter::writer::EmitRes { - let mut items = serieswriter::writer::SmallVec::new(); - let diff_data = match state.last_accepted_val.as_ref() { - Some(last) => self.0.data != last.0.data, - None => true, - }; - let diff_status = match state.last_accepted_val.as_ref() { - Some(last) => match &last.0.meta { - proto::CaMetaValue::CaMetaTime(last_meta) => match &self.0.meta { - proto::CaMetaValue::CaMetaTime(meta) => meta.status != last_meta.status, - _ => false, - }, - _ => false, - }, - None => true, - }; - let ts = tsev; - state.last_accepted_val = Some(self.clone()); let byte_size = self.byte_size(); - if diff_data { - // debug!("diff_data emit {:?}", state.series_data); - let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_data.split(ts, self.byte_size()); - let data_value = { - use ca_proto::ca::proto::CaDataValue; - use scywr::iteminsertqueue::DataValue; - let ret = match self.0.data { - CaDataValue::Scalar(val) => DataValue::Scalar({ - use ca_proto::ca::proto::CaDataScalarValue; - use scywr::iteminsertqueue::ScalarValue; - match val { - CaDataScalarValue::I8(x) => ScalarValue::I8(x), - CaDataScalarValue::I16(x) => ScalarValue::I16(x), - CaDataScalarValue::I32(x) => ScalarValue::I32(x), - CaDataScalarValue::F32(x) => ScalarValue::F32(x), - CaDataScalarValue::F64(x) => ScalarValue::F64(x), - CaDataScalarValue::Enum(x) => ScalarValue::Enum( - x, - self.1.take().unwrap_or_else(|| { - warn!("NoEnumStr"); - String::from("NoEnumStr") - }), - ), - CaDataScalarValue::String(x) => ScalarValue::String(x), - CaDataScalarValue::Bool(x) => ScalarValue::Bool(x), - } - }), - CaDataValue::Array(val) => DataValue::Array({ - use ca_proto::ca::proto::CaDataArrayValue; - use scywr::iteminsertqueue::ArrayValue; - match val { - CaDataArrayValue::I8(x) => ArrayValue::I8(x), - CaDataArrayValue::I16(x) => ArrayValue::I16(x), - CaDataArrayValue::I32(x) => ArrayValue::I32(x), - CaDataArrayValue::F32(x) => ArrayValue::F32(x), - CaDataArrayValue::F64(x) => ArrayValue::F64(x), - CaDataArrayValue::Bool(x) => ArrayValue::Bool(x), - } - }), - }; - ret - }; - if ts_msp_chg { - items.push(QueryItem::Msp(MspItem::new( - state.series_data.clone(), - ts_msp.to_ts_ms(), - ts_net, - ))); - } - let item = scywriiq::InsertItem { - series: state.series_data.clone(), - ts_msp: ts_msp.to_ts_ms(), - ts_lsp, - ts_net, - val: data_value, - }; - items.push(QueryItem::Insert(item)); - } - let mut n_status = 0; - if diff_status { + let data_item = { + use ca_proto::ca::proto::CaDataValue; use scywr::iteminsertqueue::DataValue; - use scywr::iteminsertqueue::ScalarValue; - match self.0.meta { - proto::CaMetaValue::CaMetaTime(meta) => { - let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_status.split(ts, 2); - if ts_msp_chg { - items.push(QueryItem::Msp(MspItem::new( - state.series_status.clone(), - ts_msp.to_ts_ms(), - ts_net, - ))); + match self.0.data { + CaDataValue::Scalar(val) => DataValue::Scalar({ + use ca_proto::ca::proto::CaDataScalarValue; + use scywr::iteminsertqueue::ScalarValue; + match val { + CaDataScalarValue::I8(x) => ScalarValue::I8(x), + CaDataScalarValue::I16(x) => ScalarValue::I16(x), + CaDataScalarValue::I32(x) => ScalarValue::I32(x), + CaDataScalarValue::F32(x) => ScalarValue::F32(x), + CaDataScalarValue::F64(x) => ScalarValue::F64(x), + CaDataScalarValue::Enum(x) => ScalarValue::Enum( + x, + self.1.take().unwrap_or_else(|| { + warn!("NoEnumStr"); + String::from("NoEnumStr") + }), + ), + CaDataScalarValue::String(x) => ScalarValue::String(x), + CaDataScalarValue::Bool(x) => ScalarValue::Bool(x), } - let data_value = DataValue::Scalar(ScalarValue::I16(meta.status as i16)); - let item = scywriiq::InsertItem { - series: state.series_status.clone(), - ts_msp: ts_msp.to_ts_ms(), - ts_lsp, - ts_net, - val: data_value, - }; - items.push(QueryItem::Insert(item)); - n_status += 1; - // info!("diff_status emit {:?}", state.series_status); - } - _ => { - // TODO must be able to return error here - warn!("diff_status logic error"); - } - }; - } + }), + CaDataValue::Array(val) => DataValue::Array({ + use ca_proto::ca::proto::CaDataArrayValue; + use scywr::iteminsertqueue::ArrayValue; + match val { + CaDataArrayValue::I8(x) => ArrayValue::I8(x), + CaDataArrayValue::I16(x) => ArrayValue::I16(x), + CaDataArrayValue::I32(x) => ArrayValue::I32(x), + CaDataArrayValue::F32(x) => ArrayValue::F32(x), + CaDataArrayValue::F64(x) => ArrayValue::F64(x), + CaDataArrayValue::Bool(x) => ArrayValue::Bool(x), + } + }), + } + }; + + // TODO move to separate impl + // let diff_status = match state.last_accepted_val.as_ref() { + // Some(last) => match &last.0.meta { + // proto::CaMetaValue::CaMetaTime(last_meta) => match &self.0.meta { + // proto::CaMetaValue::CaMetaTime(meta) => meta.status != last_meta.status, + // _ => false, + // }, + // _ => false, + // }, + // None => true, + // }; + // let mut n_status = 0; + // if diff_status { + // use scywr::iteminsertqueue::DataValue; + // use scywr::iteminsertqueue::ScalarValue; + // match self.0.meta { + // proto::CaMetaValue::CaMetaTime(meta) => { + // let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_status.split(ts, 2); + // if ts_msp_chg { + // items.push(QueryItem::Msp(MspItem::new( + // state.series_status.clone(), + // ts_msp.to_ts_ms(), + // ts_net, + // ))); + // } + // let data_value = DataValue::Scalar(ScalarValue::I16(meta.status as i16)); + // let item = scywriiq::InsertItem { + // series: state.series_status.clone(), + // ts_msp: ts_msp.to_ts_ms(), + // ts_lsp, + // ts_net, + // val: data_value, + // }; + // items.push(QueryItem::Insert(item)); + // n_status += 1; + // // info!("diff_status emit {:?}", state.series_status); + // } + // _ => { + // // TODO must be able to return error here + // warn!("diff_status logic error"); + // } + // }; + // } let ret = serieswriter::writer::EmitRes { - items, - bytes: byte_size, - status: n_status, + data_item, + bytes: ByteSize(byte_size), }; ret } diff --git a/netfetch/src/metrics/ingest.rs b/netfetch/src/metrics/ingest.rs index b2c499b..8af66ac 100644 --- a/netfetch/src/metrics/ingest.rs +++ b/netfetch/src/metrics/ingest.rs @@ -13,12 +13,13 @@ use futures_util::TryStreamExt; use items_2::binning::container_events::ContainerEvents; use items_2::binning::container_events::EventValueType; use netpod::APP_CBOR_FRAMED; +use netpod::ByteSize; use netpod::EnumVariant; use netpod::ScalarType; use netpod::SeriesKind; use netpod::Shape; use netpod::TsNano; -use netpod::log::*; +use netpod::log; use netpod::ttl::RetentionTime; use scywr::insertqueues::InsertDeques; use scywr::iteminsertqueue::ArrayValue; @@ -40,34 +41,12 @@ use std::time::Instant; use std::time::SystemTime; use streams::framed_bytes::FramedBytesStream; use taskrun::tokio::time::timeout; -// use core::io::BorrowedBuf; -#[allow(unused)] -macro_rules! debug_setup { - ($($arg:tt)*) => { - if true { - debug!($($arg)*); - } - }; -} +macro_rules! debug_setup { ($($arg:tt)*) => { if true { log::debug!($($arg)*); } }; } -#[allow(unused)] -macro_rules! trace_input { - ($($arg:tt)*) => { - if true { - trace!($($arg)*); - } - }; -} +macro_rules! trace_input { ($($arg:tt)*) => { if true { log::trace!($($arg)*); } }; } -#[allow(unused)] -macro_rules! trace_queues { - ($($arg:tt)*) => { - if true { - trace!($($arg)*); - } - }; -} +macro_rules! trace_queues { ($($arg:tt)*) => { if true { log::trace!($($arg)*); } }; } type ValueSeriesWriter = SeriesWriter; @@ -110,28 +89,26 @@ impl EmittableType for WritableType { tsev: TsNano, state: &mut ::State, ) -> serieswriter::writer::EmitRes { - let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_data.split(self.0.clone(), self.byte_size()); - let item = QueryItem::Insert(scywr::iteminsertqueue::InsertItem { - series: state.series.clone(), - ts_msp: ts_msp.to_ts_ms(), - ts_lsp, - val: self.1.clone(), - ts_net, - }); - let mut items = smallvec::SmallVec::new(); - items.push(item); - if ts_msp_chg { - items.push(QueryItem::Msp(scywr::iteminsertqueue::MspItem::new( - state.series.clone(), - ts_msp.to_ts_ms(), - ts_net, - ))); - } - serieswriter::writer::EmitRes { - items, - bytes: self.byte_size(), - status: 0, - } + let bytes = ByteSize(self.byte_size()); + let data_item = self.1; + // let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_data.split(self.0.clone(), self.byte_size()); + // let item = QueryItem::Insert(scywr::iteminsertqueue::InsertItem { + // series: state.series.clone(), + // ts_msp: ts_msp.to_ts_ms(), + // ts_lsp, + // val: self.1.clone(), + // ts_net, + // }); + // let mut items = smallvec::SmallVec::new(); + // items.push(item); + // if ts_msp_chg { + // items.push(QueryItem::Msp(scywr::iteminsertqueue::MspItem::new( + // state.series.clone(), + // ts_msp.to_ts_ms(), + // ts_net, + // ))); + // } + serieswriter::writer::EmitRes { data_item, bytes } } } @@ -234,7 +211,7 @@ async fn post_v01_try( let frame = match x? { Some(x) => x, None => { - trace!("input stream done"); + log::trace!("input stream done"); break; } }; @@ -358,7 +335,7 @@ where { let evs: ContainerEvents = ciborium::de::from_reader(Cursor::new(frame)) .map_err(|e| { - error!("cbor decode error {e}"); + log::error!("cbor decode error {e}"); }) .map_err(|_| Error::Decode)?; // trace_input!("see events {:?}", evs); @@ -383,7 +360,7 @@ fn evpush_dim0_enum( ) -> Result<(), Error> { let evs: ContainerEvents = ciborium::de::from_reader(Cursor::new(frame)) .map_err(|e| { - error!("cbor decode error {e}"); + log::error!("cbor decode error {e}"); }) .map_err(|_| Error::Decode)?; // trace_input!("see events {:?}", evs); @@ -412,11 +389,11 @@ where { let evs: ContainerEvents> = ciborium::de::from_reader(Cursor::new(frame)) .map_err(|e| { - error!("cbor decode error {e}"); + log::error!("cbor decode error {e}"); }) .map_err(|_| Error::Decode)?; trace_input!("see events {:?}", evs); - warn!("TODO require timestamp in input format"); + log::warn!("TODO require timestamp in input format"); let stnow = SystemTime::now(); let tsev = TsNano::from_system_time(stnow); let tsnow = Instant::now(); diff --git a/netfetch/src/metrics/ingest/write_v02.rs b/netfetch/src/metrics/ingest/write_v02.rs index 68fa210..2829734 100644 --- a/netfetch/src/metrics/ingest/write_v02.rs +++ b/netfetch/src/metrics/ingest/write_v02.rs @@ -11,6 +11,7 @@ use futures_util::TryStreamExt; use items_2::binning::container_events::ContainerEvents; use items_2::binning::container_events::EventValueType; use netpod::APP_CBOR_FRAMED; +use netpod::ByteSize; use netpod::DaqbufChannelConfig; use netpod::EnumVariant; use netpod::ScalarType; @@ -121,28 +122,24 @@ impl EmittableType for WritableType { tsev: TsNano, state: &mut ::State, ) -> serieswriter::writer::EmitRes { - let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_data.split(self.0.clone(), self.byte_size()); - let item = QueryItem::Insert(scywr::iteminsertqueue::InsertItem { - series: state.series.clone(), - ts_msp: ts_msp.to_ts_ms(), - ts_lsp, - val: self.1.clone(), - ts_net, - }); - let mut items = smallvec::SmallVec::new(); - items.push(item); - if ts_msp_chg { - items.push(QueryItem::Msp(scywr::iteminsertqueue::MspItem::new( - state.series.clone(), - ts_msp.to_ts_ms(), - ts_net, - ))); - } - serieswriter::writer::EmitRes { - items, - bytes: self.byte_size(), - status: 0, - } + let bytes = ByteSize(self.byte_size()); + let data_item = self.1; + // let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_data.split(self.0.clone(), self.byte_size()); + // let item = QueryItem::Insert(scywr::iteminsertqueue::InsertItem { + // series: state.series.clone(), + // ts_msp: ts_msp.to_ts_ms(), + // ts_lsp, + // val: self.1.clone(), + // ts_net, + // }); + // if ts_msp_chg { + // items.push(QueryItem::Msp(scywr::iteminsertqueue::MspItem::new( + // state.series.clone(), + // ts_msp.to_ts_ms(), + // ts_net, + // ))); + // } + serieswriter::writer::EmitRes { data_item, bytes } } } diff --git a/serieswriter/src/fixgridwriter.rs b/serieswriter/src/fixgridwriter.rs index def3510..7fa6caf 100644 --- a/serieswriter/src/fixgridwriter.rs +++ b/serieswriter/src/fixgridwriter.rs @@ -2,6 +2,7 @@ use crate as serieswriter; use crate::msptool::fixgrid::MspSplitFixGrid; use crate::writer::EmittableType; use crate::writer::SeriesWriter; +use netpod::ByteSize; use netpod::DtMs; use netpod::TsNano; use scywr::iteminsertqueue::DataValue; @@ -47,33 +48,32 @@ impl EmittableType for ChannelStatusWriteValue { tsev: TsNano, state: &mut ::State, ) -> serieswriter::writer::EmitRes { - let mut items = serieswriter::writer::SmallVec::new(); - let ts = tsev; - state.last_accepted_ts = ts; - state.last_accepted_val = Some(self.1); let byte_size = self.byte_size(); + let data_item = DataValue::Scalar(ScalarValue::U64(self.1)); + // let ts = tsev; + // state.last_accepted_ts = ts; + // state.last_accepted_val = Some(self.1); { - let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split.split(ts, self.byte_size()); - if ts_msp_chg { - items.push(QueryItem::Msp(MspItem::new( - state.series.clone(), - ts_msp.to_ts_ms(), - ts_net, - ))); - } - let item = scywr::iteminsertqueue::InsertItem { - series: state.series.clone(), - ts_msp: ts_msp.to_ts_ms(), - ts_lsp, - ts_net, - val: DataValue::Scalar(ScalarValue::U64(self.1)), - }; - items.push(QueryItem::Insert(item)); + // let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split.split(ts, self.byte_size()); + // if ts_msp_chg { + // items.push(QueryItem::Msp(MspItem::new( + // state.series.clone(), + // ts_msp.to_ts_ms(), + // ts_net, + // ))); + // } + // let item = scywr::iteminsertqueue::InsertItem { + // series: state.series.clone(), + // ts_msp: ts_msp.to_ts_ms(), + // ts_lsp, + // ts_net, + // val: DataValue::Scalar(ScalarValue::U64(self.1)), + // }; + // items.push(QueryItem::Insert(item)); } let ret = serieswriter::writer::EmitRes { - items, - bytes: byte_size, - status: 0, + data_item, + bytes: ByteSize(byte_size), }; ret } diff --git a/serieswriter/src/ratelimitwriter.rs b/serieswriter/src/ratelimitwriter.rs index 04351e4..fdc26e0 100644 --- a/serieswriter/src/ratelimitwriter.rs +++ b/serieswriter/src/ratelimitwriter.rs @@ -12,9 +12,9 @@ use std::marker::PhantomData; use std::time::Duration; use std::time::Instant; -macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); } -macro_rules! trace { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ); } -macro_rules! trace_rt_decision { ($dtd:expr, $($arg:expr),*) => ( if $dtd { log::trace!($($arg),*); } ); } +macro_rules! debug { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ); } +macro_rules! trace { ($($arg:tt)*) => ( if true { log::trace!($($arg)*); } ); } +macro_rules! trace_rt_decision { ($dtd:expr, $($arg:tt)*) => ( if $dtd { log::trace!($($arg)*); } ); } autoerr::create_error_v1!( name(Error, "RateLimitWriter"), @@ -27,7 +27,6 @@ autoerr::create_error_v1!( pub struct WriteRes { pub accept: bool, pub bytes: u32, - pub status: u8, } #[derive(Serialize)] @@ -145,14 +144,12 @@ where let ret = WriteRes { accept: true, bytes: res.bytes, - status: res.status, }; Ok(ret) } else { let ret = WriteRes { accept: false, bytes: 0, - status: 0, }; Ok(ret) } @@ -162,6 +159,12 @@ where let ret = self.writer.tick(iqdqs)?; Ok(ret) } + + pub fn on_close(&mut self, iqdqs: &mut VecDeque) -> Result<(), Error> { + self.tick(iqdqs)?; + self.writer.on_close(iqdqs)?; + Ok(()) + } } impl fmt::Debug for RateLimitWriter diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index e5cacad..58e0d16 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -74,10 +74,6 @@ pub struct WriteRes { } impl WriteRes { - pub fn nstatus(&self) -> u8 { - self.st.status + self.mt.status + self.lt.status - } - pub fn accept_any(&self) -> bool { self.lt.accept || self.mt.accept || self.st.accept } @@ -87,7 +83,6 @@ impl WriteRes { pub struct WriteRtRes { pub accept: bool, pub bytes: u32, - pub status: u8, } impl Default for WriteRtRes { @@ -95,7 +90,6 @@ impl Default for WriteRtRes { Self { accept: false, bytes: 0, - status: 0, } } } @@ -252,7 +246,6 @@ where let ret = WriteRtRes { accept: x.accept, bytes: x.bytes, - status: x.status, }; Ok(ret) } @@ -267,4 +260,16 @@ where self.state_lt.writer.tick(&mut iqdqs.lt_rf3_qu)?; Ok(()) } + + pub fn on_close(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { + self.tick(iqdqs)?; + if self.do_st_rf1 { + self.state_st.writer.on_close(&mut iqdqs.st_rf1_qu)?; + } else { + self.state_st.writer.on_close(&mut iqdqs.st_rf3_qu)?; + } + self.state_mt.writer.on_close(&mut iqdqs.mt_rf3_qu)?; + self.state_lt.writer.on_close(&mut iqdqs.lt_rf3_qu)?; + Ok(()) + } } diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index d13c1a9..70aa499 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -1,4 +1,4 @@ -use log::*; +use log; use netpod::TsNano; use scywr::iteminsertqueue::QueryItem; use serde::Serialize; @@ -8,9 +8,11 @@ use std::fmt; use std::marker::PhantomData; use std::time::Instant; +use netpod::ByteSize; +use scywr::iteminsertqueue::MspItem; pub use smallvec::SmallVec; -macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if $det { trace!($($arg)*); } ) } +macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if $det { log::trace!($($arg)*); } ) } autoerr::create_error_v1!( name(Error, "SerieswriterWriter"), @@ -28,9 +30,8 @@ autoerr::create_error_v1!( #[derive(Debug)] pub struct EmitRes { - pub items: SmallVec<[QueryItem; 4]>, - pub bytes: u32, - pub status: u8, + pub data_item: scywr::iteminsertqueue::DataValue, + pub bytes: ByteSize, } pub trait EmittableType: fmt::Debug + Clone { @@ -56,12 +57,12 @@ impl From for Error { #[derive(Debug)] pub struct WriteRes { pub bytes: u32, - pub status: u8, } #[derive(Debug, Serialize)] pub struct SeriesWriter { series: SeriesId, + msp_split: crate::msptool::MspSplit, do_trace_detail: bool, _t1: PhantomData, } @@ -73,6 +74,7 @@ where pub fn new(series: SeriesId) -> Result { let res = Self { series, + msp_split: crate::msptool::MspSplit::new(1024 * 64, 1024 * 1024 * 10), do_trace_detail: series::dbg::dbg_series(series), _t1: PhantomData, }; @@ -92,15 +94,28 @@ where deque: &mut VecDeque, ) -> Result { let det = self.do_trace_detail; - let ts_main = item.ts(); + // let ts_main = item.ts(); let res = item.into_query_item(ts_net, tsev, state); - trace_emit!(det, "emit value for ts {} items len {}", ts_main, res.items.len()); - for item in res.items { - deque.push_back(item); + trace_emit!(det, "emit value for ts {tsev}"); + // TODO adapt, taken from trait impl + let (ts_msp, ts_lsp, ts_msp_chg) = self.msp_split.split(tsev, res.bytes.bytes()); + if ts_msp_chg { + deque.push_back(QueryItem::Msp(MspItem::new( + self.series.clone(), + ts_msp.to_ts_ms(), + ts_net, + ))); } + let item = scywr::iteminsertqueue::InsertItem { + series: self.series.clone(), + ts_msp: ts_msp.to_ts_ms(), + ts_lsp, + ts_net, + val: res.data_item, + }; + deque.push_back(QueryItem::Insert(item)); let res = WriteRes { - bytes: res.bytes, - status: res.status, + bytes: res.bytes.bytes(), }; Ok(res) } @@ -108,4 +123,8 @@ where pub fn tick(&mut self, _deque: &mut VecDeque) -> Result<(), Error> { Ok(()) } + + pub fn on_close(&mut self, _deque: &mut VecDeque) -> Result<(), Error> { + Ok(()) + } }