WIP begin status channel changes

This commit is contained in:
Dominik Werder
2025-07-01 17:00:35 +02:00
parent 7764a1904c
commit 2f20b49193
8 changed files with 258 additions and 255 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.3.0-aa.8"
version = "0.3.0-aa.9"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2024"

View File

@@ -16,6 +16,7 @@ use futures_util::Stream;
use futures_util::StreamExt;
use hashbrown::HashMap;
use log::*;
use netpod::ByteSize;
use netpod::EMIT_ACCOUNTING_SNAP;
use netpod::ScalarType;
use netpod::SeriesKind;
@@ -478,7 +479,6 @@ struct CreatedState {
shape: Shape,
name: String,
enum_str_table: Option<Vec<String>>,
status_emit_count: u64,
#[serde(with = "serde_Instant_elapsed_ms")]
ts_recv_value_status_emit_next: Instant,
}
@@ -518,7 +518,6 @@ impl CreatedState {
shape: Shape::Scalar,
name: String::new(),
enum_str_table: None,
status_emit_count: 0,
ts_recv_value_status_emit_next: Instant::now(),
}
}
@@ -647,7 +646,8 @@ impl ChannelState {
};
let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10);
let status_emit_count = match self {
ChannelState::Writable(s) => s.channel.status_emit_count,
// TODO
// ChannelState::Writable(s) => s.channel.status_emit_count,
_ => 0,
};
let last_comparisons = match self {
@@ -1318,6 +1318,17 @@ impl CaConn {
if self.emit_connection_status_item(item).is_err() {
self.mett.logic_error().inc();
}
let cids: Vec<_> = self.channels.keys().map(Clone::clone).collect();
for cid in cids {
match self.channel_close_by_cid(cid) {
Err(e) => {
// TODO
self.mett.logic_error().inc();
}
Ok(()) => {}
}
}
// TODO should let protocol shut down properly
self.proto = None;
}
@@ -1328,7 +1339,7 @@ impl CaConn {
}
fn cmd_channel_close(&mut self, name: String) {
self.channel_close(name);
self.channel_close_by_name(name);
// TODO return the result
}
@@ -1629,18 +1640,32 @@ impl CaConn {
}
}
pub fn channel_close(&mut self, name: String) {
debug!("channel_close {}", name);
fn channel_close_by_name(&mut self, name: String) -> Result<(), Error> {
let selfname = "channel_close_by_name";
debug!("{selfname} {}", name);
if let Some(x) = self.cid_by_name.get(&name).map(Clone::clone) {
self.cid_by_name.remove(&name);
self.channel_close_by_cid(x.clone())
} else {
warn!("{selfname} {} can not find channel", name);
// TODO should return error?
Ok(())
}
}
fn channel_close_by_cid(&mut self, cid: Cid) -> Result<(), Error> {
let selfname = "channel_close_by_cid";
let tsnow = Instant::now();
let stnow = SystemTime::now();
let cid = if let Some(x) = self.cid_by_name.get(&name) {
x.clone()
} else {
debug!("channel_close {} can not find channel", name);
return;
};
self.cid_by_name.remove(&name);
if let Some(conf) = self.channels.get_mut(&cid) {
let name = conf.conf.name();
{
// TODO emit CaConn item to let CaConnSet know that we have closed the channel.
// TODO may be too full
let value = CaConnEventValue::ChannelRemoved(name.into());
let item = CaConnEvent::new_now(value);
self.ca_conn_event_out_queue.push_back(item);
}
let item = ChannelStatusItem {
ts: stnow,
cssid: conf.state.cssid(),
@@ -1651,6 +1676,14 @@ impl CaConn {
self.mett.logic_error().inc();
}
// TODO shutdown the internal writer structures.
match &mut conf.state {
ChannelState::Writable(st2) => {
if st2.writer.on_close(&mut self.iqdqs).is_err() {
self.mett.logic_error().inc();
}
}
_ => {}
}
if let Some(cst) = conf.state.created_state() {
if let Some(proto) = self.proto.as_mut() {
let ty = CaMsgTy::ChannelClose(ChannelClose {
@@ -1680,25 +1713,22 @@ impl CaConn {
};
}
} else {
debug!("channel_close {} no channel block", name);
};
debug!("{selfname} {} not found", cid);
}
{
let it = self.cid_by_sid.extract_if(|_, v| *v == cid);
it.count();
}
self.channels.remove(&cid);
// TODO emit CaConn item to let CaConnSet know that we have closed the channel.
// TODO may be too full
let value = CaConnEventValue::ChannelRemoved(name);
let item = CaConnEvent::new_now(value);
self.ca_conn_event_out_queue.push_back(item);
Ok(())
}
fn channel_remove_by_name(&mut self, name: String) {
let selfname = "channel_remove_by_name";
if let Some(cid) = self.cid_by_name(&name) {
self.channel_remove_by_cid(cid);
} else {
warn!("channel_remove does not exist {}", name);
warn!("{selfname} does not exist {}", name);
}
}
@@ -2396,7 +2426,6 @@ impl CaConn {
crst.insert_item_ivl_ema.tick(tsnow);
let val_for_agg = value.f32_for_binning();
let wres = writer.write(CaWriterValue::new(value, crst), tscaproto, tsev, iqdqs)?;
crst.status_emit_count += wres.nstatus() as u64;
if wres.st.accept {
crst.dw_st_last = stnow;
crst.acc_st.push_written(payload_len);
@@ -3036,7 +3065,6 @@ impl CaConn {
shape: shape.clone(),
name: conf.conf.name().into(),
enum_str_table: None,
status_emit_count: 0,
ts_recv_value_status_emit_next: Instant::now(),
};
if series::dbg::dbg_chn(created_state.name()) {
@@ -3977,118 +4005,92 @@ impl EmittableType for CaWriterValue {
tsev: TsNano,
state: &mut <Self as EmittableType>::State,
) -> serieswriter::writer::EmitRes {
let mut items = serieswriter::writer::SmallVec::new();
let diff_data = match state.last_accepted_val.as_ref() {
Some(last) => self.0.data != last.0.data,
None => true,
};
let diff_status = match state.last_accepted_val.as_ref() {
Some(last) => match &last.0.meta {
proto::CaMetaValue::CaMetaTime(last_meta) => match &self.0.meta {
proto::CaMetaValue::CaMetaTime(meta) => meta.status != last_meta.status,
_ => false,
},
_ => false,
},
None => true,
};
let ts = tsev;
state.last_accepted_val = Some(self.clone());
let byte_size = self.byte_size();
if diff_data {
// debug!("diff_data emit {:?}", state.series_data);
let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_data.split(ts, self.byte_size());
let data_value = {
use ca_proto::ca::proto::CaDataValue;
use scywr::iteminsertqueue::DataValue;
let ret = match self.0.data {
CaDataValue::Scalar(val) => DataValue::Scalar({
use ca_proto::ca::proto::CaDataScalarValue;
use scywr::iteminsertqueue::ScalarValue;
match val {
CaDataScalarValue::I8(x) => ScalarValue::I8(x),
CaDataScalarValue::I16(x) => ScalarValue::I16(x),
CaDataScalarValue::I32(x) => ScalarValue::I32(x),
CaDataScalarValue::F32(x) => ScalarValue::F32(x),
CaDataScalarValue::F64(x) => ScalarValue::F64(x),
CaDataScalarValue::Enum(x) => ScalarValue::Enum(
x,
self.1.take().unwrap_or_else(|| {
warn!("NoEnumStr");
String::from("NoEnumStr")
}),
),
CaDataScalarValue::String(x) => ScalarValue::String(x),
CaDataScalarValue::Bool(x) => ScalarValue::Bool(x),
}
}),
CaDataValue::Array(val) => DataValue::Array({
use ca_proto::ca::proto::CaDataArrayValue;
use scywr::iteminsertqueue::ArrayValue;
match val {
CaDataArrayValue::I8(x) => ArrayValue::I8(x),
CaDataArrayValue::I16(x) => ArrayValue::I16(x),
CaDataArrayValue::I32(x) => ArrayValue::I32(x),
CaDataArrayValue::F32(x) => ArrayValue::F32(x),
CaDataArrayValue::F64(x) => ArrayValue::F64(x),
CaDataArrayValue::Bool(x) => ArrayValue::Bool(x),
}
}),
};
ret
};
if ts_msp_chg {
items.push(QueryItem::Msp(MspItem::new(
state.series_data.clone(),
ts_msp.to_ts_ms(),
ts_net,
)));
}
let item = scywriiq::InsertItem {
series: state.series_data.clone(),
ts_msp: ts_msp.to_ts_ms(),
ts_lsp,
ts_net,
val: data_value,
};
items.push(QueryItem::Insert(item));
}
let mut n_status = 0;
if diff_status {
let data_item = {
use ca_proto::ca::proto::CaDataValue;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::ScalarValue;
match self.0.meta {
proto::CaMetaValue::CaMetaTime(meta) => {
let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_status.split(ts, 2);
if ts_msp_chg {
items.push(QueryItem::Msp(MspItem::new(
state.series_status.clone(),
ts_msp.to_ts_ms(),
ts_net,
)));
match self.0.data {
CaDataValue::Scalar(val) => DataValue::Scalar({
use ca_proto::ca::proto::CaDataScalarValue;
use scywr::iteminsertqueue::ScalarValue;
match val {
CaDataScalarValue::I8(x) => ScalarValue::I8(x),
CaDataScalarValue::I16(x) => ScalarValue::I16(x),
CaDataScalarValue::I32(x) => ScalarValue::I32(x),
CaDataScalarValue::F32(x) => ScalarValue::F32(x),
CaDataScalarValue::F64(x) => ScalarValue::F64(x),
CaDataScalarValue::Enum(x) => ScalarValue::Enum(
x,
self.1.take().unwrap_or_else(|| {
warn!("NoEnumStr");
String::from("NoEnumStr")
}),
),
CaDataScalarValue::String(x) => ScalarValue::String(x),
CaDataScalarValue::Bool(x) => ScalarValue::Bool(x),
}
let data_value = DataValue::Scalar(ScalarValue::I16(meta.status as i16));
let item = scywriiq::InsertItem {
series: state.series_status.clone(),
ts_msp: ts_msp.to_ts_ms(),
ts_lsp,
ts_net,
val: data_value,
};
items.push(QueryItem::Insert(item));
n_status += 1;
// info!("diff_status emit {:?}", state.series_status);
}
_ => {
// TODO must be able to return error here
warn!("diff_status logic error");
}
};
}
}),
CaDataValue::Array(val) => DataValue::Array({
use ca_proto::ca::proto::CaDataArrayValue;
use scywr::iteminsertqueue::ArrayValue;
match val {
CaDataArrayValue::I8(x) => ArrayValue::I8(x),
CaDataArrayValue::I16(x) => ArrayValue::I16(x),
CaDataArrayValue::I32(x) => ArrayValue::I32(x),
CaDataArrayValue::F32(x) => ArrayValue::F32(x),
CaDataArrayValue::F64(x) => ArrayValue::F64(x),
CaDataArrayValue::Bool(x) => ArrayValue::Bool(x),
}
}),
}
};
// TODO move to separate impl
// let diff_status = match state.last_accepted_val.as_ref() {
// Some(last) => match &last.0.meta {
// proto::CaMetaValue::CaMetaTime(last_meta) => match &self.0.meta {
// proto::CaMetaValue::CaMetaTime(meta) => meta.status != last_meta.status,
// _ => false,
// },
// _ => false,
// },
// None => true,
// };
// let mut n_status = 0;
// if diff_status {
// use scywr::iteminsertqueue::DataValue;
// use scywr::iteminsertqueue::ScalarValue;
// match self.0.meta {
// proto::CaMetaValue::CaMetaTime(meta) => {
// let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_status.split(ts, 2);
// if ts_msp_chg {
// items.push(QueryItem::Msp(MspItem::new(
// state.series_status.clone(),
// ts_msp.to_ts_ms(),
// ts_net,
// )));
// }
// let data_value = DataValue::Scalar(ScalarValue::I16(meta.status as i16));
// let item = scywriiq::InsertItem {
// series: state.series_status.clone(),
// ts_msp: ts_msp.to_ts_ms(),
// ts_lsp,
// ts_net,
// val: data_value,
// };
// items.push(QueryItem::Insert(item));
// n_status += 1;
// // info!("diff_status emit {:?}", state.series_status);
// }
// _ => {
// // TODO must be able to return error here
// warn!("diff_status logic error");
// }
// };
// }
let ret = serieswriter::writer::EmitRes {
items,
bytes: byte_size,
status: n_status,
data_item,
bytes: ByteSize(byte_size),
};
ret
}

View File

@@ -13,12 +13,13 @@ use futures_util::TryStreamExt;
use items_2::binning::container_events::ContainerEvents;
use items_2::binning::container_events::EventValueType;
use netpod::APP_CBOR_FRAMED;
use netpod::ByteSize;
use netpod::EnumVariant;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
use netpod::TsNano;
use netpod::log::*;
use netpod::log;
use netpod::ttl::RetentionTime;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::ArrayValue;
@@ -40,34 +41,12 @@ use std::time::Instant;
use std::time::SystemTime;
use streams::framed_bytes::FramedBytesStream;
use taskrun::tokio::time::timeout;
// use core::io::BorrowedBuf;
#[allow(unused)]
macro_rules! debug_setup {
($($arg:tt)*) => {
if true {
debug!($($arg)*);
}
};
}
macro_rules! debug_setup { ($($arg:tt)*) => { if true { log::debug!($($arg)*); } }; }
#[allow(unused)]
macro_rules! trace_input {
($($arg:tt)*) => {
if true {
trace!($($arg)*);
}
};
}
macro_rules! trace_input { ($($arg:tt)*) => { if true { log::trace!($($arg)*); } }; }
#[allow(unused)]
macro_rules! trace_queues {
($($arg:tt)*) => {
if true {
trace!($($arg)*);
}
};
}
macro_rules! trace_queues { ($($arg:tt)*) => { if true { log::trace!($($arg)*); } }; }
type ValueSeriesWriter = SeriesWriter<WritableType>;
@@ -110,28 +89,26 @@ impl EmittableType for WritableType {
tsev: TsNano,
state: &mut <Self as EmittableType>::State,
) -> serieswriter::writer::EmitRes {
let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_data.split(self.0.clone(), self.byte_size());
let item = QueryItem::Insert(scywr::iteminsertqueue::InsertItem {
series: state.series.clone(),
ts_msp: ts_msp.to_ts_ms(),
ts_lsp,
val: self.1.clone(),
ts_net,
});
let mut items = smallvec::SmallVec::new();
items.push(item);
if ts_msp_chg {
items.push(QueryItem::Msp(scywr::iteminsertqueue::MspItem::new(
state.series.clone(),
ts_msp.to_ts_ms(),
ts_net,
)));
}
serieswriter::writer::EmitRes {
items,
bytes: self.byte_size(),
status: 0,
}
let bytes = ByteSize(self.byte_size());
let data_item = self.1;
// let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_data.split(self.0.clone(), self.byte_size());
// let item = QueryItem::Insert(scywr::iteminsertqueue::InsertItem {
// series: state.series.clone(),
// ts_msp: ts_msp.to_ts_ms(),
// ts_lsp,
// val: self.1.clone(),
// ts_net,
// });
// let mut items = smallvec::SmallVec::new();
// items.push(item);
// if ts_msp_chg {
// items.push(QueryItem::Msp(scywr::iteminsertqueue::MspItem::new(
// state.series.clone(),
// ts_msp.to_ts_ms(),
// ts_net,
// )));
// }
serieswriter::writer::EmitRes { data_item, bytes }
}
}
@@ -234,7 +211,7 @@ async fn post_v01_try(
let frame = match x? {
Some(x) => x,
None => {
trace!("input stream done");
log::trace!("input stream done");
break;
}
};
@@ -358,7 +335,7 @@ where
{
let evs: ContainerEvents<T> = ciborium::de::from_reader(Cursor::new(frame))
.map_err(|e| {
error!("cbor decode error {e}");
log::error!("cbor decode error {e}");
})
.map_err(|_| Error::Decode)?;
// trace_input!("see events {:?}", evs);
@@ -383,7 +360,7 @@ fn evpush_dim0_enum(
) -> Result<(), Error> {
let evs: ContainerEvents<EnumVariant> = ciborium::de::from_reader(Cursor::new(frame))
.map_err(|e| {
error!("cbor decode error {e}");
log::error!("cbor decode error {e}");
})
.map_err(|_| Error::Decode)?;
// trace_input!("see events {:?}", evs);
@@ -412,11 +389,11 @@ where
{
let evs: ContainerEvents<Vec<T>> = ciborium::de::from_reader(Cursor::new(frame))
.map_err(|e| {
error!("cbor decode error {e}");
log::error!("cbor decode error {e}");
})
.map_err(|_| Error::Decode)?;
trace_input!("see events {:?}", evs);
warn!("TODO require timestamp in input format");
log::warn!("TODO require timestamp in input format");
let stnow = SystemTime::now();
let tsev = TsNano::from_system_time(stnow);
let tsnow = Instant::now();

View File

@@ -11,6 +11,7 @@ use futures_util::TryStreamExt;
use items_2::binning::container_events::ContainerEvents;
use items_2::binning::container_events::EventValueType;
use netpod::APP_CBOR_FRAMED;
use netpod::ByteSize;
use netpod::DaqbufChannelConfig;
use netpod::EnumVariant;
use netpod::ScalarType;
@@ -121,28 +122,24 @@ impl EmittableType for WritableType {
tsev: TsNano,
state: &mut <Self as EmittableType>::State,
) -> serieswriter::writer::EmitRes {
let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_data.split(self.0.clone(), self.byte_size());
let item = QueryItem::Insert(scywr::iteminsertqueue::InsertItem {
series: state.series.clone(),
ts_msp: ts_msp.to_ts_ms(),
ts_lsp,
val: self.1.clone(),
ts_net,
});
let mut items = smallvec::SmallVec::new();
items.push(item);
if ts_msp_chg {
items.push(QueryItem::Msp(scywr::iteminsertqueue::MspItem::new(
state.series.clone(),
ts_msp.to_ts_ms(),
ts_net,
)));
}
serieswriter::writer::EmitRes {
items,
bytes: self.byte_size(),
status: 0,
}
let bytes = ByteSize(self.byte_size());
let data_item = self.1;
// let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_data.split(self.0.clone(), self.byte_size());
// let item = QueryItem::Insert(scywr::iteminsertqueue::InsertItem {
// series: state.series.clone(),
// ts_msp: ts_msp.to_ts_ms(),
// ts_lsp,
// val: self.1.clone(),
// ts_net,
// });
// if ts_msp_chg {
// items.push(QueryItem::Msp(scywr::iteminsertqueue::MspItem::new(
// state.series.clone(),
// ts_msp.to_ts_ms(),
// ts_net,
// )));
// }
serieswriter::writer::EmitRes { data_item, bytes }
}
}

View File

@@ -2,6 +2,7 @@ use crate as serieswriter;
use crate::msptool::fixgrid::MspSplitFixGrid;
use crate::writer::EmittableType;
use crate::writer::SeriesWriter;
use netpod::ByteSize;
use netpod::DtMs;
use netpod::TsNano;
use scywr::iteminsertqueue::DataValue;
@@ -47,33 +48,32 @@ impl EmittableType for ChannelStatusWriteValue {
tsev: TsNano,
state: &mut <Self as EmittableType>::State,
) -> serieswriter::writer::EmitRes {
let mut items = serieswriter::writer::SmallVec::new();
let ts = tsev;
state.last_accepted_ts = ts;
state.last_accepted_val = Some(self.1);
let byte_size = self.byte_size();
let data_item = DataValue::Scalar(ScalarValue::U64(self.1));
// let ts = tsev;
// state.last_accepted_ts = ts;
// state.last_accepted_val = Some(self.1);
{
let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split.split(ts, self.byte_size());
if ts_msp_chg {
items.push(QueryItem::Msp(MspItem::new(
state.series.clone(),
ts_msp.to_ts_ms(),
ts_net,
)));
}
let item = scywr::iteminsertqueue::InsertItem {
series: state.series.clone(),
ts_msp: ts_msp.to_ts_ms(),
ts_lsp,
ts_net,
val: DataValue::Scalar(ScalarValue::U64(self.1)),
};
items.push(QueryItem::Insert(item));
// let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split.split(ts, self.byte_size());
// if ts_msp_chg {
// items.push(QueryItem::Msp(MspItem::new(
// state.series.clone(),
// ts_msp.to_ts_ms(),
// ts_net,
// )));
// }
// let item = scywr::iteminsertqueue::InsertItem {
// series: state.series.clone(),
// ts_msp: ts_msp.to_ts_ms(),
// ts_lsp,
// ts_net,
// val: DataValue::Scalar(ScalarValue::U64(self.1)),
// };
// items.push(QueryItem::Insert(item));
}
let ret = serieswriter::writer::EmitRes {
items,
bytes: byte_size,
status: 0,
data_item,
bytes: ByteSize(byte_size),
};
ret
}

View File

@@ -12,9 +12,9 @@ use std::marker::PhantomData;
use std::time::Duration;
use std::time::Instant;
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); }
macro_rules! trace { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ); }
macro_rules! trace_rt_decision { ($dtd:expr, $($arg:expr),*) => ( if $dtd { log::trace!($($arg),*); } ); }
macro_rules! debug { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ); }
macro_rules! trace { ($($arg:tt)*) => ( if true { log::trace!($($arg)*); } ); }
macro_rules! trace_rt_decision { ($dtd:expr, $($arg:tt)*) => ( if $dtd { log::trace!($($arg)*); } ); }
autoerr::create_error_v1!(
name(Error, "RateLimitWriter"),
@@ -27,7 +27,6 @@ autoerr::create_error_v1!(
pub struct WriteRes {
pub accept: bool,
pub bytes: u32,
pub status: u8,
}
#[derive(Serialize)]
@@ -145,14 +144,12 @@ where
let ret = WriteRes {
accept: true,
bytes: res.bytes,
status: res.status,
};
Ok(ret)
} else {
let ret = WriteRes {
accept: false,
bytes: 0,
status: 0,
};
Ok(ret)
}
@@ -162,6 +159,12 @@ where
let ret = self.writer.tick(iqdqs)?;
Ok(ret)
}
pub fn on_close(&mut self, iqdqs: &mut VecDeque<QueryItem>) -> Result<(), Error> {
self.tick(iqdqs)?;
self.writer.on_close(iqdqs)?;
Ok(())
}
}
impl<ET> fmt::Debug for RateLimitWriter<ET>

View File

@@ -74,10 +74,6 @@ pub struct WriteRes {
}
impl WriteRes {
pub fn nstatus(&self) -> u8 {
self.st.status + self.mt.status + self.lt.status
}
pub fn accept_any(&self) -> bool {
self.lt.accept || self.mt.accept || self.st.accept
}
@@ -87,7 +83,6 @@ impl WriteRes {
pub struct WriteRtRes {
pub accept: bool,
pub bytes: u32,
pub status: u8,
}
impl Default for WriteRtRes {
@@ -95,7 +90,6 @@ impl Default for WriteRtRes {
Self {
accept: false,
bytes: 0,
status: 0,
}
}
}
@@ -252,7 +246,6 @@ where
let ret = WriteRtRes {
accept: x.accept,
bytes: x.bytes,
status: x.status,
};
Ok(ret)
}
@@ -267,4 +260,16 @@ where
self.state_lt.writer.tick(&mut iqdqs.lt_rf3_qu)?;
Ok(())
}
pub fn on_close(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
self.tick(iqdqs)?;
if self.do_st_rf1 {
self.state_st.writer.on_close(&mut iqdqs.st_rf1_qu)?;
} else {
self.state_st.writer.on_close(&mut iqdqs.st_rf3_qu)?;
}
self.state_mt.writer.on_close(&mut iqdqs.mt_rf3_qu)?;
self.state_lt.writer.on_close(&mut iqdqs.lt_rf3_qu)?;
Ok(())
}
}

View File

@@ -1,4 +1,4 @@
use log::*;
use log;
use netpod::TsNano;
use scywr::iteminsertqueue::QueryItem;
use serde::Serialize;
@@ -8,9 +8,11 @@ use std::fmt;
use std::marker::PhantomData;
use std::time::Instant;
use netpod::ByteSize;
use scywr::iteminsertqueue::MspItem;
pub use smallvec::SmallVec;
macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if $det { trace!($($arg)*); } ) }
macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if $det { log::trace!($($arg)*); } ) }
autoerr::create_error_v1!(
name(Error, "SerieswriterWriter"),
@@ -28,9 +30,8 @@ autoerr::create_error_v1!(
#[derive(Debug)]
pub struct EmitRes {
pub items: SmallVec<[QueryItem; 4]>,
pub bytes: u32,
pub status: u8,
pub data_item: scywr::iteminsertqueue::DataValue,
pub bytes: ByteSize,
}
pub trait EmittableType: fmt::Debug + Clone {
@@ -56,12 +57,12 @@ impl From<async_channel::RecvError> for Error {
#[derive(Debug)]
pub struct WriteRes {
pub bytes: u32,
pub status: u8,
}
#[derive(Debug, Serialize)]
pub struct SeriesWriter<ET> {
series: SeriesId,
msp_split: crate::msptool::MspSplit,
do_trace_detail: bool,
_t1: PhantomData<ET>,
}
@@ -73,6 +74,7 @@ where
pub fn new(series: SeriesId) -> Result<Self, Error> {
let res = Self {
series,
msp_split: crate::msptool::MspSplit::new(1024 * 64, 1024 * 1024 * 10),
do_trace_detail: series::dbg::dbg_series(series),
_t1: PhantomData,
};
@@ -92,15 +94,28 @@ where
deque: &mut VecDeque<QueryItem>,
) -> Result<WriteRes, Error> {
let det = self.do_trace_detail;
let ts_main = item.ts();
// let ts_main = item.ts();
let res = item.into_query_item(ts_net, tsev, state);
trace_emit!(det, "emit value for ts {} items len {}", ts_main, res.items.len());
for item in res.items {
deque.push_back(item);
trace_emit!(det, "emit value for ts {tsev}");
// TODO adapt, taken from trait impl
let (ts_msp, ts_lsp, ts_msp_chg) = self.msp_split.split(tsev, res.bytes.bytes());
if ts_msp_chg {
deque.push_back(QueryItem::Msp(MspItem::new(
self.series.clone(),
ts_msp.to_ts_ms(),
ts_net,
)));
}
let item = scywr::iteminsertqueue::InsertItem {
series: self.series.clone(),
ts_msp: ts_msp.to_ts_ms(),
ts_lsp,
ts_net,
val: res.data_item,
};
deque.push_back(QueryItem::Insert(item));
let res = WriteRes {
bytes: res.bytes,
status: res.status,
bytes: res.bytes.bytes(),
};
Ok(res)
}
@@ -108,4 +123,8 @@ where
pub fn tick(&mut self, _deque: &mut VecDeque<QueryItem>) -> Result<(), Error> {
Ok(())
}
pub fn on_close(&mut self, _deque: &mut VecDeque<QueryItem>) -> Result<(), Error> {
Ok(())
}
}