diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 1e479fb..a05bc8e 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -45,7 +45,7 @@ pub struct CaIngestOpts { scylla_disable: bool, #[serde(default)] scylla_ignore_writes: bool, - #[serde(default)] + #[serde(default = "bool_true")] binwriter_enable: bool, } diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 7d547aa..7047420 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -428,13 +428,13 @@ fn make_routes( "/channel", make_routes_channel(rres.clone(), dcom.clone(), connset_cmd_tx.clone(), stats_set.clone()), ) - .nest( - "/ingest", - make_routes_ingest(rres.clone(), dcom.clone(), connset_cmd_tx.clone(), stats_set.clone()), - ) .nest( "/private", Router::new() + .nest( + "/ingest", + make_routes_ingest(rres.clone(), dcom.clone(), connset_cmd_tx.clone(), stats_set.clone()), + ) .nest( "/channel", make_routes_private_channel( @@ -563,15 +563,14 @@ fn make_routes_ingest( connset_cmd_tx: Sender, stats_set: StatsSet, ) -> axum::Router { - use axum::Router; - use axum::extract; use axum::routing::{get, post, put}; + use axum::{Router, extract}; use http::StatusCode; Router::new() .nest( - "/private", + "/write", Router::new().route( - "/write", + "/v1", put({ let rres = rres.clone(); move |(headers, params, body): (HeaderMap, Query>, axum::body::Body)| { @@ -597,9 +596,8 @@ fn make_routes_private_channel( connset_cmd_tx: Sender, stats_set: StatsSet, ) -> axum::Router { - use axum::Router; - use axum::extract; use axum::routing::{get, post, put}; + use axum::{Router, extract}; use http::StatusCode; Router::new() .route( diff --git a/netfetch/src/metrics/ingest/write_v02.rs b/netfetch/src/metrics/ingest/write_v02.rs index 7f698f3..68fa210 100644 --- a/netfetch/src/metrics/ingest/write_v02.rs +++ b/netfetch/src/metrics/ingest/write_v02.rs @@ -26,8 +26,13 @@ use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::ScalarValue; use serde::Deserialize; use serde::Serialize; +use series::ChannelStatusSeriesId; use series::SeriesId; +use serieswriter::binwriter::BinWriter; +use serieswriter::binwriter::DiscardFirstOutput; +use serieswriter::binwriter::WriteCntZero; use serieswriter::msptool::MspSplit; +use serieswriter::rtwriter::MinQuiets; use serieswriter::writer::EmittableType; use serieswriter::writer::SeriesWriter; use std::collections::HashMap; @@ -141,17 +146,23 @@ impl EmittableType for WritableType { } } -fn evpush_dim0( - frame: &Bytes, - deque: &mut VecDeque, - writer: &mut ValueSeriesWriter, - f1: F1, -) -> Result<(), Error> +struct EvPushParams<'a> { + frame: &'a Bytes, + writer: &'a mut ValueSeriesWriter, + binwriter: &'a mut Option, + iqdqs: &'a mut InsertDeques, + chname: &'a str, + rt: RetentionTime, + scalar_type: ScalarType, + shape: Shape, +} + +fn evpush_dim0(mut params: EvPushParams, f1: F1) -> Result<(), Error> where T: EventValueType, F1: Fn(::IterTy1<'_>) -> DataValue, { - let evs: ContainerEvents = ciborium::de::from_reader(Cursor::new(frame)) + let evs: ContainerEvents = ciborium::de::from_reader(Cursor::new(params.frame)) .map_err(|e| { error!("cbor decode error {e}"); }) @@ -160,22 +171,56 @@ where let stnow = SystemTime::now(); let tsev = TsNano::from_system_time(stnow); let tsnow = Instant::now(); - let mut emit_state = WritableTypeState::new(writer.sid()); - for (i, (ts, val)) in evs.iter_zip().enumerate() { - let val = val.clone(); - trace_input!("ev {:6} {:20} {:20?}", i, ts, val); - let val = f1(val); - writer.write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?; + let mut emit_state = WritableTypeState::new(params.writer.sid()); + if evs.len() != 0 { + if params.binwriter.is_none() { + for (i, (ts, val)) in evs.iter_zip().enumerate() { + let min_quiets = MinQuiets::http_ingest_default(); + let is_polled = false; + let emit_znt_zero_default = WriteCntZero::Disable; + let do_discard_front = DiscardFirstOutput::Enable; + let cssid = ChannelStatusSeriesId::new(0); + let sid = params.writer.sid(); + let wr2 = BinWriter::new( + ts, + min_quiets, + is_polled, + emit_znt_zero_default, + do_discard_front, + cssid, + sid, + params.scalar_type.clone(), + params.shape.clone(), + params.chname.into(), + ) + .unwrap(); + *params.binwriter = Some(wr2); + break; + } + } + let binwriter = params.binwriter.as_mut().unwrap(); + for (i, (ts, val)) in evs.iter_zip().enumerate() { + let val = val.clone(); + let val = f1(val); + let val_f32 = val.f32_for_binning(); + binwriter.ingest(ts, val_f32, params.iqdqs).unwrap(); + } + let deque = params.iqdqs.deque(params.rt.clone()); + for (i, (ts, val)) in evs.iter_zip().enumerate() { + let val = val.clone(); + trace_input!("ev {:6} {:20} {:20?}", i, ts, val); + let val = f1(val); + params + .writer + .write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?; + } + } else { } Ok(()) } -fn evpush_dim0_enum( - frame: &Bytes, - deque: &mut VecDeque, - writer: &mut ValueSeriesWriter, -) -> Result<(), Error> { - let evs: ContainerEvents = ciborium::de::from_reader(Cursor::new(frame)) +fn evpush_dim0_enum(mut params: EvPushParams) -> Result<(), Error> { + let evs: ContainerEvents = ciborium::de::from_reader(Cursor::new(params.frame)) .map_err(|e| { error!("cbor decode error {e}"); }) @@ -184,27 +229,25 @@ fn evpush_dim0_enum( let stnow = SystemTime::now(); let tsev = TsNano::from_system_time(stnow); let tsnow = Instant::now(); - let mut emit_state = WritableTypeState::new(writer.sid()); + let mut emit_state = WritableTypeState::new(params.writer.sid()); + let deque = params.iqdqs.deque(params.rt.clone()); for (i, (ts, val)) in evs.iter_zip().enumerate() { let val = val.clone(); trace_input!("ev {:6} {:20} {:20?}", i, ts, val); let val = DataValue::Scalar(ScalarValue::Enum(val.ix as i16, val.name.into())); - writer.write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?; + params + .writer + .write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?; } Ok(()) } -fn evpush_dim1( - frame: &Bytes, - deque: &mut VecDeque, - writer: &mut ValueSeriesWriter, - f1: F1, -) -> Result<(), Error> +fn evpush_dim1(mut params: EvPushParams, f1: F1) -> Result<(), Error> where Vec: EventValueType, F1: Fn( as EventValueType>::IterTy1<'_>) -> DataValue, { - let evs: ContainerEvents> = ciborium::de::from_reader(Cursor::new(frame)) + let evs: ContainerEvents> = ciborium::de::from_reader(Cursor::new(params.frame)) .map_err(|e| { error!("cbor decode error {e}"); }) @@ -214,97 +257,111 @@ where let stnow = SystemTime::now(); let tsev = TsNano::from_system_time(stnow); let tsnow = Instant::now(); - let mut emit_state = WritableTypeState::new(writer.sid()); + let mut emit_state = WritableTypeState::new(params.writer.sid()); + let deque = params.iqdqs.deque(params.rt.clone()); for (i, (ts, val)) in evs.iter_zip().enumerate() { let val = val.clone(); trace_input!("ev {:6} {:20} {:20?}", i, ts, val); let val = f1(val); - writer.write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?; + params + .writer + .write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?; } Ok(()) } fn frame_write( frame: &Bytes, + chname: &str, + rt: RetentionTime, scalar_type: ScalarType, shape: Shape, writer: &mut SeriesWriter, - deque: &mut VecDeque, + binwriter: &mut Option, + iqdqs: &mut InsertDeques, ) -> Result<(), Error> { + let params = EvPushParams { + frame, + writer, + binwriter, + iqdqs, + chname, + rt, + scalar_type: scalar_type.clone(), + shape: shape.clone(), + }; match &shape { Shape::Scalar => match &scalar_type { ScalarType::U8 => { - evpush_dim0::(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::U8(x as _)))?; + evpush_dim0::(params, |x| DataValue::Scalar(ScalarValue::U8(x as _)))?; } ScalarType::U16 => { - evpush_dim0::(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::U16(x as _)))?; + evpush_dim0::(params, |x| DataValue::Scalar(ScalarValue::U16(x as _)))?; } ScalarType::U32 => { - evpush_dim0::(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::U32(x as _)))?; + evpush_dim0::(params, |x| DataValue::Scalar(ScalarValue::U32(x as _)))?; } ScalarType::U64 => { - evpush_dim0::(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::U64(x as _)))?; + evpush_dim0::(params, |x| DataValue::Scalar(ScalarValue::U64(x as _)))?; } ScalarType::I8 => { - evpush_dim0::(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::I8(x)))?; + evpush_dim0::(params, |x| DataValue::Scalar(ScalarValue::I8(x)))?; } ScalarType::I16 => { - evpush_dim0::(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::I16(x)))?; + evpush_dim0::(params, |x| DataValue::Scalar(ScalarValue::I16(x)))?; } ScalarType::I32 => { - evpush_dim0::(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::I32(x)))?; + evpush_dim0::(params, |x| DataValue::Scalar(ScalarValue::I32(x)))?; } ScalarType::I64 => { - evpush_dim0::(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::I64(x)))?; + evpush_dim0::(params, |x| DataValue::Scalar(ScalarValue::I64(x)))?; } ScalarType::F32 => { - evpush_dim0::(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::F32(x)))?; + evpush_dim0::(params, |x| DataValue::Scalar(ScalarValue::F32(x)))?; } ScalarType::F64 => { - evpush_dim0::(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::F64(x)))?; + evpush_dim0::(params, |x| DataValue::Scalar(ScalarValue::F64(x)))?; } ScalarType::BOOL => { - evpush_dim0::(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::Bool(x)))?; + evpush_dim0::(params, |x| DataValue::Scalar(ScalarValue::Bool(x)))?; } ScalarType::STRING => { - evpush_dim0::(&frame, deque, writer, |x| { - DataValue::Scalar(ScalarValue::String(x.into())) - })?; + evpush_dim0::(params, |x| DataValue::Scalar(ScalarValue::String(x.into())))?; } ScalarType::Enum => { - evpush_dim0_enum(&frame, deque, writer)?; + evpush_dim0_enum(params)?; } }, Shape::Wave(_) => match &scalar_type { ScalarType::U8 => { - evpush_dim1::(&frame, deque, writer, |x| DataValue::Array(ArrayValue::U8(x)))?; + evpush_dim1::(params, |x| DataValue::Array(ArrayValue::U8(x)))?; } ScalarType::U16 => { - evpush_dim1::(&frame, deque, writer, |x| DataValue::Array(ArrayValue::U16(x)))?; + evpush_dim1::(params, |x| DataValue::Array(ArrayValue::U16(x)))?; } ScalarType::U32 => { - evpush_dim1::(&frame, deque, writer, |x| DataValue::Array(ArrayValue::U32(x)))?; + evpush_dim1::(params, |x| DataValue::Array(ArrayValue::U32(x)))?; } ScalarType::U64 => { - evpush_dim1::(&frame, deque, writer, |x| DataValue::Array(ArrayValue::U64(x)))?; + evpush_dim1::(params, |x| DataValue::Array(ArrayValue::U64(x)))?; } ScalarType::I8 => { - evpush_dim1::(&frame, deque, writer, |x| DataValue::Array(ArrayValue::I8(x)))?; + evpush_dim1::(params, |x| DataValue::Array(ArrayValue::I8(x)))?; } ScalarType::I16 => { - evpush_dim1::(&frame, deque, writer, |x| DataValue::Array(ArrayValue::I16(x)))?; + evpush_dim1::(params, |x| DataValue::Array(ArrayValue::I16(x)))?; } ScalarType::I32 => { - evpush_dim1::(&frame, deque, writer, |x| DataValue::Array(ArrayValue::I32(x)))?; + evpush_dim1::(params, |x| DataValue::Array(ArrayValue::I32(x)))?; } ScalarType::I64 => { - evpush_dim1::(&frame, deque, writer, |x| DataValue::Array(ArrayValue::I64(x)))?; + evpush_dim1::(params, |x| DataValue::Array(ArrayValue::I64(x)))?; } ScalarType::F32 => { - evpush_dim1::(&frame, deque, writer, |x| DataValue::Array(ArrayValue::F32(x)))?; + evpush_dim1::(params, |x| DataValue::Array(ArrayValue::F32(x)))?; } ScalarType::F64 => { - evpush_dim1::(&frame, deque, writer, |x| DataValue::Array(ArrayValue::F64(x)))?; + evpush_dim1::(params, |x| DataValue::Array(ArrayValue::F64(x)))?; } ScalarType::BOOL => return Err(Error::NotSupported), ScalarType::STRING => return Err(Error::NotSupported), @@ -337,7 +394,7 @@ async fn write_with_fresh_msps_inner( let stnow = SystemTime::now(); let worker_tx = rres.worker_tx.clone(); let backend = rres.backend.clone(); - let channel = params.get("channelName").ok_or(Error::MissingChannelName)?.into(); + let channel = params.get("channelName").ok_or(Error::MissingChannelName)?.to_string(); let s = params.get("scalarType").ok_or(Error::MissingScalarType)?; let scalar_type = ScalarType::from_variant_str(&s).map_err(|e| Error::Parse(e.to_string()))?; let shape: Shape = serde_json::from_str(params.get("shape").map_or("[]", |x| x.as_str()))?; @@ -355,7 +412,7 @@ async fn write_with_fresh_msps_inner( let (tx, rx) = async_channel::bounded(8); let qu = ChannelInfoQuery { backend, - channel, + channel: channel.clone(), kind: SeriesKind::ChannelData, scalar_type: scalar_type.clone(), shape: shape.clone(), @@ -368,6 +425,7 @@ async fn write_with_fresh_msps_inner( .map_err(|_| Error::ConfigLookup)? .map_err(|_| Error::ConfigLookup)?; let mut writer = SeriesWriter::new(chinfo.series.to_series())?; + let mut binwriter = None; debug_setup!("series writer established"); let mut iqdqs = InsertDeques::new(); let mut iqtx = rres.iqtx.clone(); @@ -392,22 +450,38 @@ async fn write_with_fresh_msps_inner( } }; trace_input!("got frame len {}", frame.len()); - let deque = iqdqs.deque(rt.clone()); - frame_write(&frame, scalar_type.clone(), shape.clone(), &mut writer, deque)?; + frame_write( + &frame, + &channel, + rt.clone(), + scalar_type.clone(), + shape.clone(), + &mut writer, + &mut binwriter, + &mut iqdqs, + )?; trace_queues!("frame send_all begin {} {}", iqdqs.summary(), iqtx.summary()); iqtx.send_all(&mut iqdqs).await?; trace_queues!("frame send_all done {} {}", iqdqs.summary(), iqtx.summary()); tick_writers(&mut writer, &mut iqdqs, rt.clone())?; trace_queues!("frame tick_writers done {} {}", iqdqs.summary(), iqtx.summary()); + if let Some(binwriter) = binwriter.as_mut() { + binwriter.tick(&mut iqdqs).unwrap(); + } } - trace_queues!("after send_all begin {} {}", iqdqs.summary(), iqtx.summary()); iqtx.send_all(&mut iqdqs).await?; trace_queues!("after send_all done {} {}", iqdqs.summary(), iqtx.summary()); finish_writers(&mut writer, &mut iqdqs, rt.clone())?; trace_queues!("after finish_writers done {} {}", iqdqs.summary(), iqtx.summary()); - - let ret = Json(serde_json::json!({})); + if let Some(binwriter) = binwriter.as_mut() { + binwriter.tick(&mut iqdqs).unwrap(); + } + let ret = Json(serde_json::json!({ + "status": "ok", + "chinfo": chinfo, + "series_id": chinfo.series.to_series().id(), + })); Ok(ret) } @@ -546,6 +620,7 @@ async fn write_events_exact_2( debug_setup!("write_events_exact {:?} {:?}", conf, rt); let series = SeriesId::new(conf.series); let mut writer = SeriesWriter::new(series)?; + let mut binwriter = None; debug_setup!("series writer established"); let mut iqdqs = InsertDeques::new(); let mut iqtx = rres.iqtx.clone(); @@ -559,7 +634,16 @@ async fn write_events_exact_2( Some(frame) => { trace_input!("got frame len {}", frame.len()); let deque = iqdqs.deque(rt.clone()); - frame_write(&frame, conf.scalar_type.clone(), conf.shape.clone(), &mut writer, deque)?; + frame_write( + &frame, + &conf.name, + rt.clone(), + conf.scalar_type.clone(), + conf.shape.clone(), + &mut writer, + &mut binwriter, + &mut iqdqs, + )?; trace_queues!("frame send_all begin {} {}", iqdqs.summary(), iqtx.summary()); iqtx.send_all(&mut iqdqs).await?; trace_queues!("frame send_all done {} {}", iqdqs.summary(), iqtx.summary()); diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index b8ad59b..e5cacad 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -48,6 +48,14 @@ impl MinQuiets { lt: Duration::from_millis(1000 * 60), } } + + pub fn http_ingest_default() -> Self { + Self { + st: Duration::from_millis(0), + mt: Duration::from_millis(0), + lt: Duration::from_millis(1000 * 60 * 60), + } + } } #[derive(Debug, Serialize)]