diff --git a/netfetch/src/metrics/ingest.rs b/netfetch/src/metrics/ingest.rs index a90cbe2..63435b7 100644 --- a/netfetch/src/metrics/ingest.rs +++ b/netfetch/src/metrics/ingest.rs @@ -11,11 +11,13 @@ use err::ThisError; use futures_util::StreamExt; use futures_util::TryStreamExt; use items_2::eventsdim0::EventsDim0; +use items_2::eventsdim0::EventsDim0Enum; use items_2::eventsdim0::EventsDim0NoPulse; use items_2::eventsdim1::EventsDim1; use items_2::eventsdim1::EventsDim1NoPulse; use netpod::log::*; use netpod::ttl::RetentionTime; +use netpod::EnumVariant; use netpod::ScalarType; use netpod::SeriesKind; use netpod::Shape; @@ -27,6 +29,8 @@ use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::ScalarValue; use serde::Deserialize; +use series::SeriesId; +use serieswriter::msptool::MspSplit; use serieswriter::writer::EmittableType; use serieswriter::writer::SeriesWriter; use std::collections::HashMap; @@ -44,7 +48,7 @@ use taskrun::tokio::time::timeout; macro_rules! debug_setup { ($($arg:tt)*) => { if true { - info!($($arg)*); + debug!($($arg)*); } }; } @@ -52,7 +56,7 @@ macro_rules! debug_setup { #[allow(unused)] macro_rules! trace_input { ($($arg:tt)*) => { - if false { + if true { trace!($($arg)*); } }; @@ -61,7 +65,7 @@ macro_rules! trace_input { #[allow(unused)] macro_rules! trace_queues { ($($arg:tt)*) => { - if false { + if true { trace!($($arg)*); } }; @@ -69,22 +73,36 @@ macro_rules! trace_queues { type ValueSeriesWriter = SeriesWriter; +struct WritableTypeState { + series: SeriesId, + msp_split_data: MspSplit, +} + +impl WritableTypeState { + fn new(series: SeriesId) -> Self { + Self { + series, + msp_split_data: MspSplit::new(10000, 1024 * 256), + } + } +} + #[derive(Debug, Clone)] -struct WritableType(DataValue); +struct WritableType(TsNano, DataValue); impl EmittableType for WritableType { - type State = (); + type State = WritableTypeState; fn ts(&self) -> TsNano { - todo!() + self.0.clone() } fn has_change(&self, k: &Self) -> bool { - todo!() + true } fn byte_size(&self) -> u32 { - todo!() + 8 + self.1.byte_size() } fn into_query_item( @@ -93,7 +111,28 @@ impl EmittableType for WritableType { tsev: TsNano, state: &mut ::State, ) -> serieswriter::writer::EmitRes { - todo!() + let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_data.split(self.0.clone(), self.byte_size()); + let item = QueryItem::Insert(scywr::iteminsertqueue::InsertItem { + series: state.series.clone(), + ts_msp: ts_msp.to_ts_ms(), + ts_lsp, + val: self.1.clone(), + ts_net, + }); + let mut items = smallvec::SmallVec::new(); + items.push(item); + if ts_msp_chg { + items.push(QueryItem::Msp(scywr::iteminsertqueue::MspItem::new( + state.series.clone(), + ts_msp.to_ts_ms(), + ts_net, + ))); + } + serieswriter::writer::EmitRes { + items, + bytes: self.byte_size(), + status: 0, + } } } @@ -239,13 +278,17 @@ async fn post_v01_try( ScalarType::F64 => { evpush_dim0::(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::F64(x)))?; } - ScalarType::BOOL => return Err(Error::NotSupported), + ScalarType::BOOL => { + evpush_dim0::(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::Bool(x)))?; + } ScalarType::STRING => { evpush_dim0::(&frame, deque, &mut writer, |x| { DataValue::Scalar(ScalarValue::String(x)) })?; } - ScalarType::Enum => return Err(Error::NotSupported), + ScalarType::Enum => { + evpush_dim0_enum(&frame, deque, &mut writer)?; + } }, Shape::Wave(_) => match &scalar_type { ScalarType::U8 => { @@ -317,17 +360,49 @@ where }) .map_err(|_| Error::Decode)?; let evs: EventsDim0 = evs.into(); - trace_input!("see events {:?}", evs); - warn!("TODO require timestamp in input format"); + // trace_input!("see events {:?}", evs); let stnow = SystemTime::now(); let tsev = TsNano::from_system_time(stnow); let tsnow = Instant::now(); - let mut emit_state = (); + let mut emit_state = WritableTypeState::new(writer.sid()); for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() { + let ts = TsNano::from_ns(ts); let val = val.clone(); trace_input!("ev {:6} {:20} {:20?}", i, ts, val); let val = f1(val); - writer.write(WritableType(val), &mut emit_state, tsnow, tsev, deque)?; + writer.write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?; + } + Ok(()) +} + +// Special case for enum +fn evpush_dim0_enum( + frame: &Bytes, + deque: &mut VecDeque, + writer: &mut ValueSeriesWriter, +) -> Result<(), Error> { + let evs: EventsDim0Enum = ciborium::de::from_reader(Cursor::new(frame)) + .map_err(|e| { + error!("cbor decode error {e}"); + }) + .map_err(|_| Error::Decode)?; + // trace_input!("see events {:?}", evs); + let stnow = SystemTime::now(); + let tsev = TsNano::from_system_time(stnow); + let tsnow = Instant::now(); + let mut emit_state = WritableTypeState::new(writer.sid()); + for (i, ((&ts, val), vals)) in evs + .tss + .iter() + .zip(evs.values.iter()) + .zip(evs.valuestrs.iter()) + .enumerate() + { + let ts = TsNano::from_ns(ts); + let val = val.clone(); + trace_input!("ev {:6} {:20} {:20?}", i, ts, val); + let val = DataValue::Scalar(ScalarValue::Enum(val as i16, vals.clone())); + writer.write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?; } Ok(()) } @@ -353,12 +428,13 @@ where let stnow = SystemTime::now(); let tsev = TsNano::from_system_time(stnow); let tsnow = Instant::now(); - let mut emit_state = (); + let mut emit_state = WritableTypeState::new(writer.sid()); for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() { + let ts = TsNano::from_ns(ts); let val = val.clone(); trace_input!("ev {:6} {:20} {:20?}", i, ts, val); let val = f1(val); - writer.write(WritableType(val), &mut emit_state, tsnow, tsev, deque)?; + writer.write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?; } Ok(()) }