post ingest

This commit is contained in:
Dominik Werder
2024-09-24 15:33:24 +02:00
parent 79aa8d0466
commit 026fec48ff

View File

@@ -11,11 +11,13 @@ use err::ThisError;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim0::EventsDim0Enum;
use items_2::eventsdim0::EventsDim0NoPulse;
use items_2::eventsdim1::EventsDim1;
use items_2::eventsdim1::EventsDim1NoPulse;
use netpod::log::*;
use netpod::ttl::RetentionTime;
use netpod::EnumVariant;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
@@ -27,6 +29,8 @@ use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::ScalarValue;
use serde::Deserialize;
use series::SeriesId;
use serieswriter::msptool::MspSplit;
use serieswriter::writer::EmittableType;
use serieswriter::writer::SeriesWriter;
use std::collections::HashMap;
@@ -44,7 +48,7 @@ use taskrun::tokio::time::timeout;
macro_rules! debug_setup {
($($arg:tt)*) => {
if true {
info!($($arg)*);
debug!($($arg)*);
}
};
}
@@ -52,7 +56,7 @@ macro_rules! debug_setup {
#[allow(unused)]
macro_rules! trace_input {
($($arg:tt)*) => {
if false {
if true {
trace!($($arg)*);
}
};
@@ -61,7 +65,7 @@ macro_rules! trace_input {
#[allow(unused)]
macro_rules! trace_queues {
($($arg:tt)*) => {
if false {
if true {
trace!($($arg)*);
}
};
@@ -69,22 +73,36 @@ macro_rules! trace_queues {
type ValueSeriesWriter = SeriesWriter<WritableType>;
struct WritableTypeState {
series: SeriesId,
msp_split_data: MspSplit,
}
impl WritableTypeState {
fn new(series: SeriesId) -> Self {
Self {
series,
msp_split_data: MspSplit::new(10000, 1024 * 256),
}
}
}
#[derive(Debug, Clone)]
struct WritableType(DataValue);
struct WritableType(TsNano, DataValue);
impl EmittableType for WritableType {
type State = ();
type State = WritableTypeState;
fn ts(&self) -> TsNano {
todo!()
self.0.clone()
}
fn has_change(&self, k: &Self) -> bool {
todo!()
true
}
fn byte_size(&self) -> u32 {
todo!()
8 + self.1.byte_size()
}
fn into_query_item(
@@ -93,7 +111,28 @@ impl EmittableType for WritableType {
tsev: TsNano,
state: &mut <Self as EmittableType>::State,
) -> serieswriter::writer::EmitRes {
todo!()
let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_data.split(self.0.clone(), self.byte_size());
let item = QueryItem::Insert(scywr::iteminsertqueue::InsertItem {
series: state.series.clone(),
ts_msp: ts_msp.to_ts_ms(),
ts_lsp,
val: self.1.clone(),
ts_net,
});
let mut items = smallvec::SmallVec::new();
items.push(item);
if ts_msp_chg {
items.push(QueryItem::Msp(scywr::iteminsertqueue::MspItem::new(
state.series.clone(),
ts_msp.to_ts_ms(),
ts_net,
)));
}
serieswriter::writer::EmitRes {
items,
bytes: self.byte_size(),
status: 0,
}
}
}
@@ -239,13 +278,17 @@ async fn post_v01_try(
ScalarType::F64 => {
evpush_dim0::<f64, _>(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::F64(x)))?;
}
ScalarType::BOOL => return Err(Error::NotSupported),
ScalarType::BOOL => {
evpush_dim0::<bool, _>(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::Bool(x)))?;
}
ScalarType::STRING => {
evpush_dim0::<String, _>(&frame, deque, &mut writer, |x| {
DataValue::Scalar(ScalarValue::String(x))
})?;
}
ScalarType::Enum => return Err(Error::NotSupported),
ScalarType::Enum => {
evpush_dim0_enum(&frame, deque, &mut writer)?;
}
},
Shape::Wave(_) => match &scalar_type {
ScalarType::U8 => {
@@ -317,17 +360,49 @@ where
})
.map_err(|_| Error::Decode)?;
let evs: EventsDim0<T> = evs.into();
trace_input!("see events {:?}", evs);
warn!("TODO require timestamp in input format");
// trace_input!("see events {:?}", evs);
let stnow = SystemTime::now();
let tsev = TsNano::from_system_time(stnow);
let tsnow = Instant::now();
let mut emit_state = ();
let mut emit_state = WritableTypeState::new(writer.sid());
for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() {
let ts = TsNano::from_ns(ts);
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = f1(val);
writer.write(WritableType(val), &mut emit_state, tsnow, tsev, deque)?;
writer.write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?;
}
Ok(())
}
// Special case for enum
fn evpush_dim0_enum(
frame: &Bytes,
deque: &mut VecDeque<QueryItem>,
writer: &mut ValueSeriesWriter,
) -> Result<(), Error> {
let evs: EventsDim0Enum = ciborium::de::from_reader(Cursor::new(frame))
.map_err(|e| {
error!("cbor decode error {e}");
})
.map_err(|_| Error::Decode)?;
// trace_input!("see events {:?}", evs);
let stnow = SystemTime::now();
let tsev = TsNano::from_system_time(stnow);
let tsnow = Instant::now();
let mut emit_state = WritableTypeState::new(writer.sid());
for (i, ((&ts, val), vals)) in evs
.tss
.iter()
.zip(evs.values.iter())
.zip(evs.valuestrs.iter())
.enumerate()
{
let ts = TsNano::from_ns(ts);
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = DataValue::Scalar(ScalarValue::Enum(val as i16, vals.clone()));
writer.write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?;
}
Ok(())
}
@@ -353,12 +428,13 @@ where
let stnow = SystemTime::now();
let tsev = TsNano::from_system_time(stnow);
let tsnow = Instant::now();
let mut emit_state = ();
let mut emit_state = WritableTypeState::new(writer.sid());
for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() {
let ts = TsNano::from_ns(ts);
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = f1(val);
writer.write(WritableType(val), &mut emit_state, tsnow, tsev, deque)?;
writer.write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?;
}
Ok(())
}