Add binning to PUT ingest

This commit is contained in:
Dominik Werder
2025-06-27 17:35:09 +02:00
parent b8ce63e3f2
commit 7764a1904c
4 changed files with 165 additions and 75 deletions

View File

@@ -45,7 +45,7 @@ pub struct CaIngestOpts {
scylla_disable: bool,
#[serde(default)]
scylla_ignore_writes: bool,
#[serde(default)]
#[serde(default = "bool_true")]
binwriter_enable: bool,
}

View File

@@ -428,13 +428,13 @@ fn make_routes(
"/channel",
make_routes_channel(rres.clone(), dcom.clone(), connset_cmd_tx.clone(), stats_set.clone()),
)
.nest(
"/ingest",
make_routes_ingest(rres.clone(), dcom.clone(), connset_cmd_tx.clone(), stats_set.clone()),
)
.nest(
"/private",
Router::new()
.nest(
"/ingest",
make_routes_ingest(rres.clone(), dcom.clone(), connset_cmd_tx.clone(), stats_set.clone()),
)
.nest(
"/channel",
make_routes_private_channel(
@@ -563,15 +563,14 @@ fn make_routes_ingest(
connset_cmd_tx: Sender<CaConnSetEvent>,
stats_set: StatsSet,
) -> axum::Router {
use axum::Router;
use axum::extract;
use axum::routing::{get, post, put};
use axum::{Router, extract};
use http::StatusCode;
Router::new()
.nest(
"/private",
"/write",
Router::new().route(
"/write",
"/v1",
put({
let rres = rres.clone();
move |(headers, params, body): (HeaderMap, Query<HashMap<String, String>>, axum::body::Body)| {
@@ -597,9 +596,8 @@ fn make_routes_private_channel(
connset_cmd_tx: Sender<CaConnSetEvent>,
stats_set: StatsSet,
) -> axum::Router {
use axum::Router;
use axum::extract;
use axum::routing::{get, post, put};
use axum::{Router, extract};
use http::StatusCode;
Router::new()
.route(

View File

@@ -26,8 +26,13 @@ use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::ScalarValue;
use serde::Deserialize;
use serde::Serialize;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use serieswriter::binwriter::BinWriter;
use serieswriter::binwriter::DiscardFirstOutput;
use serieswriter::binwriter::WriteCntZero;
use serieswriter::msptool::MspSplit;
use serieswriter::rtwriter::MinQuiets;
use serieswriter::writer::EmittableType;
use serieswriter::writer::SeriesWriter;
use std::collections::HashMap;
@@ -141,17 +146,23 @@ impl EmittableType for WritableType {
}
}
fn evpush_dim0<T, F1>(
frame: &Bytes,
deque: &mut VecDeque<QueryItem>,
writer: &mut ValueSeriesWriter,
f1: F1,
) -> Result<(), Error>
struct EvPushParams<'a> {
frame: &'a Bytes,
writer: &'a mut ValueSeriesWriter,
binwriter: &'a mut Option<BinWriter>,
iqdqs: &'a mut InsertDeques,
chname: &'a str,
rt: RetentionTime,
scalar_type: ScalarType,
shape: Shape,
}
fn evpush_dim0<T, F1>(mut params: EvPushParams, f1: F1) -> Result<(), Error>
where
T: EventValueType,
F1: Fn(<T as EventValueType>::IterTy1<'_>) -> DataValue,
{
let evs: ContainerEvents<T> = ciborium::de::from_reader(Cursor::new(frame))
let evs: ContainerEvents<T> = ciborium::de::from_reader(Cursor::new(params.frame))
.map_err(|e| {
error!("cbor decode error {e}");
})
@@ -160,22 +171,56 @@ where
let stnow = SystemTime::now();
let tsev = TsNano::from_system_time(stnow);
let tsnow = Instant::now();
let mut emit_state = WritableTypeState::new(writer.sid());
for (i, (ts, val)) in evs.iter_zip().enumerate() {
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = f1(val);
writer.write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?;
let mut emit_state = WritableTypeState::new(params.writer.sid());
if evs.len() != 0 {
if params.binwriter.is_none() {
for (i, (ts, val)) in evs.iter_zip().enumerate() {
let min_quiets = MinQuiets::http_ingest_default();
let is_polled = false;
let emit_znt_zero_default = WriteCntZero::Disable;
let do_discard_front = DiscardFirstOutput::Enable;
let cssid = ChannelStatusSeriesId::new(0);
let sid = params.writer.sid();
let wr2 = BinWriter::new(
ts,
min_quiets,
is_polled,
emit_znt_zero_default,
do_discard_front,
cssid,
sid,
params.scalar_type.clone(),
params.shape.clone(),
params.chname.into(),
)
.unwrap();
*params.binwriter = Some(wr2);
break;
}
}
let binwriter = params.binwriter.as_mut().unwrap();
for (i, (ts, val)) in evs.iter_zip().enumerate() {
let val = val.clone();
let val = f1(val);
let val_f32 = val.f32_for_binning();
binwriter.ingest(ts, val_f32, params.iqdqs).unwrap();
}
let deque = params.iqdqs.deque(params.rt.clone());
for (i, (ts, val)) in evs.iter_zip().enumerate() {
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = f1(val);
params
.writer
.write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?;
}
} else {
}
Ok(())
}
fn evpush_dim0_enum(
frame: &Bytes,
deque: &mut VecDeque<QueryItem>,
writer: &mut ValueSeriesWriter,
) -> Result<(), Error> {
let evs: ContainerEvents<EnumVariant> = ciborium::de::from_reader(Cursor::new(frame))
fn evpush_dim0_enum(mut params: EvPushParams) -> Result<(), Error> {
let evs: ContainerEvents<EnumVariant> = ciborium::de::from_reader(Cursor::new(params.frame))
.map_err(|e| {
error!("cbor decode error {e}");
})
@@ -184,27 +229,25 @@ fn evpush_dim0_enum(
let stnow = SystemTime::now();
let tsev = TsNano::from_system_time(stnow);
let tsnow = Instant::now();
let mut emit_state = WritableTypeState::new(writer.sid());
let mut emit_state = WritableTypeState::new(params.writer.sid());
let deque = params.iqdqs.deque(params.rt.clone());
for (i, (ts, val)) in evs.iter_zip().enumerate() {
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = DataValue::Scalar(ScalarValue::Enum(val.ix as i16, val.name.into()));
writer.write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?;
params
.writer
.write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?;
}
Ok(())
}
fn evpush_dim1<T, F1>(
frame: &Bytes,
deque: &mut VecDeque<QueryItem>,
writer: &mut ValueSeriesWriter,
f1: F1,
) -> Result<(), Error>
fn evpush_dim1<T, F1>(mut params: EvPushParams, f1: F1) -> Result<(), Error>
where
Vec<T>: EventValueType,
F1: Fn(<Vec<T> as EventValueType>::IterTy1<'_>) -> DataValue,
{
let evs: ContainerEvents<Vec<T>> = ciborium::de::from_reader(Cursor::new(frame))
let evs: ContainerEvents<Vec<T>> = ciborium::de::from_reader(Cursor::new(params.frame))
.map_err(|e| {
error!("cbor decode error {e}");
})
@@ -214,97 +257,111 @@ where
let stnow = SystemTime::now();
let tsev = TsNano::from_system_time(stnow);
let tsnow = Instant::now();
let mut emit_state = WritableTypeState::new(writer.sid());
let mut emit_state = WritableTypeState::new(params.writer.sid());
let deque = params.iqdqs.deque(params.rt.clone());
for (i, (ts, val)) in evs.iter_zip().enumerate() {
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = f1(val);
writer.write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?;
params
.writer
.write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?;
}
Ok(())
}
fn frame_write(
frame: &Bytes,
chname: &str,
rt: RetentionTime,
scalar_type: ScalarType,
shape: Shape,
writer: &mut SeriesWriter<WritableType>,
deque: &mut VecDeque<QueryItem>,
binwriter: &mut Option<BinWriter>,
iqdqs: &mut InsertDeques,
) -> Result<(), Error> {
let params = EvPushParams {
frame,
writer,
binwriter,
iqdqs,
chname,
rt,
scalar_type: scalar_type.clone(),
shape: shape.clone(),
};
match &shape {
Shape::Scalar => match &scalar_type {
ScalarType::U8 => {
evpush_dim0::<u8, _>(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::U8(x as _)))?;
evpush_dim0::<u8, _>(params, |x| DataValue::Scalar(ScalarValue::U8(x as _)))?;
}
ScalarType::U16 => {
evpush_dim0::<u16, _>(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::U16(x as _)))?;
evpush_dim0::<u16, _>(params, |x| DataValue::Scalar(ScalarValue::U16(x as _)))?;
}
ScalarType::U32 => {
evpush_dim0::<u32, _>(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::U32(x as _)))?;
evpush_dim0::<u32, _>(params, |x| DataValue::Scalar(ScalarValue::U32(x as _)))?;
}
ScalarType::U64 => {
evpush_dim0::<u64, _>(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::U64(x as _)))?;
evpush_dim0::<u64, _>(params, |x| DataValue::Scalar(ScalarValue::U64(x as _)))?;
}
ScalarType::I8 => {
evpush_dim0::<i8, _>(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::I8(x)))?;
evpush_dim0::<i8, _>(params, |x| DataValue::Scalar(ScalarValue::I8(x)))?;
}
ScalarType::I16 => {
evpush_dim0::<i16, _>(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::I16(x)))?;
evpush_dim0::<i16, _>(params, |x| DataValue::Scalar(ScalarValue::I16(x)))?;
}
ScalarType::I32 => {
evpush_dim0::<i32, _>(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::I32(x)))?;
evpush_dim0::<i32, _>(params, |x| DataValue::Scalar(ScalarValue::I32(x)))?;
}
ScalarType::I64 => {
evpush_dim0::<i64, _>(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::I64(x)))?;
evpush_dim0::<i64, _>(params, |x| DataValue::Scalar(ScalarValue::I64(x)))?;
}
ScalarType::F32 => {
evpush_dim0::<f32, _>(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::F32(x)))?;
evpush_dim0::<f32, _>(params, |x| DataValue::Scalar(ScalarValue::F32(x)))?;
}
ScalarType::F64 => {
evpush_dim0::<f64, _>(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::F64(x)))?;
evpush_dim0::<f64, _>(params, |x| DataValue::Scalar(ScalarValue::F64(x)))?;
}
ScalarType::BOOL => {
evpush_dim0::<bool, _>(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::Bool(x)))?;
evpush_dim0::<bool, _>(params, |x| DataValue::Scalar(ScalarValue::Bool(x)))?;
}
ScalarType::STRING => {
evpush_dim0::<String, _>(&frame, deque, writer, |x| {
DataValue::Scalar(ScalarValue::String(x.into()))
})?;
evpush_dim0::<String, _>(params, |x| DataValue::Scalar(ScalarValue::String(x.into())))?;
}
ScalarType::Enum => {
evpush_dim0_enum(&frame, deque, writer)?;
evpush_dim0_enum(params)?;
}
},
Shape::Wave(_) => match &scalar_type {
ScalarType::U8 => {
evpush_dim1::<u8, _>(&frame, deque, writer, |x| DataValue::Array(ArrayValue::U8(x)))?;
evpush_dim1::<u8, _>(params, |x| DataValue::Array(ArrayValue::U8(x)))?;
}
ScalarType::U16 => {
evpush_dim1::<u16, _>(&frame, deque, writer, |x| DataValue::Array(ArrayValue::U16(x)))?;
evpush_dim1::<u16, _>(params, |x| DataValue::Array(ArrayValue::U16(x)))?;
}
ScalarType::U32 => {
evpush_dim1::<u32, _>(&frame, deque, writer, |x| DataValue::Array(ArrayValue::U32(x)))?;
evpush_dim1::<u32, _>(params, |x| DataValue::Array(ArrayValue::U32(x)))?;
}
ScalarType::U64 => {
evpush_dim1::<u64, _>(&frame, deque, writer, |x| DataValue::Array(ArrayValue::U64(x)))?;
evpush_dim1::<u64, _>(params, |x| DataValue::Array(ArrayValue::U64(x)))?;
}
ScalarType::I8 => {
evpush_dim1::<i8, _>(&frame, deque, writer, |x| DataValue::Array(ArrayValue::I8(x)))?;
evpush_dim1::<i8, _>(params, |x| DataValue::Array(ArrayValue::I8(x)))?;
}
ScalarType::I16 => {
evpush_dim1::<i16, _>(&frame, deque, writer, |x| DataValue::Array(ArrayValue::I16(x)))?;
evpush_dim1::<i16, _>(params, |x| DataValue::Array(ArrayValue::I16(x)))?;
}
ScalarType::I32 => {
evpush_dim1::<i32, _>(&frame, deque, writer, |x| DataValue::Array(ArrayValue::I32(x)))?;
evpush_dim1::<i32, _>(params, |x| DataValue::Array(ArrayValue::I32(x)))?;
}
ScalarType::I64 => {
evpush_dim1::<i64, _>(&frame, deque, writer, |x| DataValue::Array(ArrayValue::I64(x)))?;
evpush_dim1::<i64, _>(params, |x| DataValue::Array(ArrayValue::I64(x)))?;
}
ScalarType::F32 => {
evpush_dim1::<f32, _>(&frame, deque, writer, |x| DataValue::Array(ArrayValue::F32(x)))?;
evpush_dim1::<f32, _>(params, |x| DataValue::Array(ArrayValue::F32(x)))?;
}
ScalarType::F64 => {
evpush_dim1::<f64, _>(&frame, deque, writer, |x| DataValue::Array(ArrayValue::F64(x)))?;
evpush_dim1::<f64, _>(params, |x| DataValue::Array(ArrayValue::F64(x)))?;
}
ScalarType::BOOL => return Err(Error::NotSupported),
ScalarType::STRING => return Err(Error::NotSupported),
@@ -337,7 +394,7 @@ async fn write_with_fresh_msps_inner(
let stnow = SystemTime::now();
let worker_tx = rres.worker_tx.clone();
let backend = rres.backend.clone();
let channel = params.get("channelName").ok_or(Error::MissingChannelName)?.into();
let channel = params.get("channelName").ok_or(Error::MissingChannelName)?.to_string();
let s = params.get("scalarType").ok_or(Error::MissingScalarType)?;
let scalar_type = ScalarType::from_variant_str(&s).map_err(|e| Error::Parse(e.to_string()))?;
let shape: Shape = serde_json::from_str(params.get("shape").map_or("[]", |x| x.as_str()))?;
@@ -355,7 +412,7 @@ async fn write_with_fresh_msps_inner(
let (tx, rx) = async_channel::bounded(8);
let qu = ChannelInfoQuery {
backend,
channel,
channel: channel.clone(),
kind: SeriesKind::ChannelData,
scalar_type: scalar_type.clone(),
shape: shape.clone(),
@@ -368,6 +425,7 @@ async fn write_with_fresh_msps_inner(
.map_err(|_| Error::ConfigLookup)?
.map_err(|_| Error::ConfigLookup)?;
let mut writer = SeriesWriter::new(chinfo.series.to_series())?;
let mut binwriter = None;
debug_setup!("series writer established");
let mut iqdqs = InsertDeques::new();
let mut iqtx = rres.iqtx.clone();
@@ -392,22 +450,38 @@ async fn write_with_fresh_msps_inner(
}
};
trace_input!("got frame len {}", frame.len());
let deque = iqdqs.deque(rt.clone());
frame_write(&frame, scalar_type.clone(), shape.clone(), &mut writer, deque)?;
frame_write(
&frame,
&channel,
rt.clone(),
scalar_type.clone(),
shape.clone(),
&mut writer,
&mut binwriter,
&mut iqdqs,
)?;
trace_queues!("frame send_all begin {} {}", iqdqs.summary(), iqtx.summary());
iqtx.send_all(&mut iqdqs).await?;
trace_queues!("frame send_all done {} {}", iqdqs.summary(), iqtx.summary());
tick_writers(&mut writer, &mut iqdqs, rt.clone())?;
trace_queues!("frame tick_writers done {} {}", iqdqs.summary(), iqtx.summary());
if let Some(binwriter) = binwriter.as_mut() {
binwriter.tick(&mut iqdqs).unwrap();
}
}
trace_queues!("after send_all begin {} {}", iqdqs.summary(), iqtx.summary());
iqtx.send_all(&mut iqdqs).await?;
trace_queues!("after send_all done {} {}", iqdqs.summary(), iqtx.summary());
finish_writers(&mut writer, &mut iqdqs, rt.clone())?;
trace_queues!("after finish_writers done {} {}", iqdqs.summary(), iqtx.summary());
let ret = Json(serde_json::json!({}));
if let Some(binwriter) = binwriter.as_mut() {
binwriter.tick(&mut iqdqs).unwrap();
}
let ret = Json(serde_json::json!({
"status": "ok",
"chinfo": chinfo,
"series_id": chinfo.series.to_series().id(),
}));
Ok(ret)
}
@@ -546,6 +620,7 @@ async fn write_events_exact_2(
debug_setup!("write_events_exact {:?} {:?}", conf, rt);
let series = SeriesId::new(conf.series);
let mut writer = SeriesWriter::new(series)?;
let mut binwriter = None;
debug_setup!("series writer established");
let mut iqdqs = InsertDeques::new();
let mut iqtx = rres.iqtx.clone();
@@ -559,7 +634,16 @@ async fn write_events_exact_2(
Some(frame) => {
trace_input!("got frame len {}", frame.len());
let deque = iqdqs.deque(rt.clone());
frame_write(&frame, conf.scalar_type.clone(), conf.shape.clone(), &mut writer, deque)?;
frame_write(
&frame,
&conf.name,
rt.clone(),
conf.scalar_type.clone(),
conf.shape.clone(),
&mut writer,
&mut binwriter,
&mut iqdqs,
)?;
trace_queues!("frame send_all begin {} {}", iqdqs.summary(), iqtx.summary());
iqtx.send_all(&mut iqdqs).await?;
trace_queues!("frame send_all done {} {}", iqdqs.summary(), iqtx.summary());

View File

@@ -48,6 +48,14 @@ impl MinQuiets {
lt: Duration::from_millis(1000 * 60),
}
}
pub fn http_ingest_default() -> Self {
Self {
st: Duration::from_millis(0),
mt: Duration::from_millis(0),
lt: Duration::from_millis(1000 * 60 * 60),
}
}
}
#[derive(Debug, Serialize)]