From 1e0505a3f95c1aafe693e2340dec6cef5b2b30d1 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 20 Jun 2024 15:46:12 +0200 Subject: [PATCH] Add remaining types, add docs --- netfetch/src/ca/conn.rs | 4 +- netfetch/src/metrics.rs | 9 +- netfetch/src/metrics/ingest.rs | 282 +++++++++++++++++++++++++++++---- postingest.md | 46 ++++++ readme.md | 43 +++-- scywr/src/insertqueues.rs | 143 ++++++++++++----- scywr/src/insertworker.rs | 104 ++---------- scywr/src/iteminsertqueue.rs | 242 ++++++++++++---------------- scywr/src/store.rs | 25 ++- 9 files changed, 579 insertions(+), 319 deletions(-) create mode 100644 postingest.md diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 62479e4..72608f2 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -2595,8 +2595,8 @@ impl CaConn { } fn log_queues_summary(&self) { - self.iqdqs.log_summary(); - self.iqsp.log_summary(); + trace!("{}", self.iqdqs.summary()); + trace!("{}", self.iqsp.summary()); } } diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 0b04506..283df48 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -16,6 +16,7 @@ use async_channel::Sender; use async_channel::WeakSender; use axum::extract::Query; use axum::http; +use axum::http::HeaderMap; use axum::response::IntoResponse; use axum::response::Response; use bytes::Bytes; @@ -388,9 +389,11 @@ fn make_routes( "/v1", post({ let rres = rres.clone(); - move |(params, body): (Query>, axum::body::Body)| { - ingest::post_v01((params, body), rres) - } + move |(headers, params, body): ( + HeaderMap, + Query>, + axum::body::Body, + )| { ingest::post_v01((headers, params, body), rres) } }), ), ), diff --git a/netfetch/src/metrics/ingest.rs b/netfetch/src/metrics/ingest.rs index b0dad3f..32aa906 100644 --- a/netfetch/src/metrics/ingest.rs +++ b/netfetch/src/metrics/ingest.rs @@ -1,105 +1,317 @@ use super::RoutesResources; use axum::extract::FromRequest; use axum::extract::Query; +use axum::http::HeaderMap; use axum::Json; +use bytes::Bytes; +use core::fmt; use err::thiserror; use err::ThisError; use futures_util::StreamExt; use futures_util::TryStreamExt; use items_2::eventsdim0::EventsDim0; +use items_2::eventsdim0::EventsDim0NoPulse; +use items_2::eventsdim1::EventsDim1; +use items_2::eventsdim1::EventsDim1NoPulse; use netpod::log::*; use netpod::ScalarType; use netpod::Shape; use netpod::TsNano; +use netpod::APP_CBOR_FRAMED; use scywr::insertqueues::InsertDeques; +use scywr::iteminsertqueue::ArrayValue; use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::ScalarValue; +use serde::Deserialize; use serieswriter::writer::SeriesWriter; use std::collections::HashMap; use std::collections::VecDeque; use std::io::Cursor; use std::sync::Arc; +use std::time::Duration; use std::time::SystemTime; use streams::framed_bytes::FramedBytesStream; +use taskrun::tokio::time::timeout; // use core::io::BorrowedBuf; +#[allow(unused)] +macro_rules! debug_setup { + ($($arg:tt)*) => { + if true { + info!($($arg)*); + } + }; +} + +#[allow(unused)] +macro_rules! trace_input { + ($($arg:tt)*) => { + if false { + trace!($($arg)*); + } + }; +} + +#[allow(unused)] +macro_rules! trace_queues { + ($($arg:tt)*) => { + if false { + trace!($($arg)*); + } + }; +} + #[derive(Debug, ThisError)] pub enum Error { + UnsupportedContentType, Logic, SeriesWriter(#[from] serieswriter::writer::Error), MissingChannelName, + MissingScalarType, + MissingShape, SendError, Decode, FramedBytes(#[from] streams::framed_bytes::Error), + InsertQueues(#[from] scywr::insertqueues::Error), + Serde(#[from] serde_json::Error), + #[error("Parse({0})")] + Parse(String), + NotSupported, } -struct BodyRead {} - pub async fn post_v01( - (Query(params), body): (Query>, axum::body::Body), + (headers, Query(params), body): (HeaderMap, Query>, axum::body::Body), rres: Arc, ) -> Json { - match post_v01_try(params, body, rres).await { + match post_v01_try(headers, params, body, rres).await { Ok(k) => k, - Err(e) => Json(serde_json::Value::String(e.to_string())), + Err(e) => Json(serde_json::json!({ + "error": e.to_string(), + })), } } async fn post_v01_try( + headers: HeaderMap, params: HashMap, body: axum::body::Body, rres: Arc, ) -> Result, Error> { - info!("params {:?}", params); + if let Some(ct) = headers.get("content-type") { + if let Ok(s) = ct.to_str() { + if s == APP_CBOR_FRAMED { + } else { + return Err(Error::UnsupportedContentType); + } + } else { + return Err(Error::UnsupportedContentType); + } + } else { + return Err(Error::UnsupportedContentType); + }; + debug_setup!("params {:?}", params); let stnow = SystemTime::now(); let worker_tx = rres.worker_tx.clone(); let backend = rres.backend.clone(); let channel = params.get("channelName").ok_or(Error::MissingChannelName)?.into(); - let scalar_type = ScalarType::I16; - let shape = Shape::Scalar; - info!("establishing..."); - let mut writer = SeriesWriter::establish(worker_tx, backend, channel, scalar_type, shape, stnow).await?; - + let s = params.get("scalarType").ok_or(Error::MissingScalarType)?; + let scalar_type = ScalarType::from_variant_str(&s).map_err(|e| Error::Parse(e.to_string()))?; + let shape: Shape = serde_json::from_str(params.get("shape").map_or("[]", |x| x.as_str()))?; + debug_setup!("parsed scalar_type {scalar_type:?}"); + debug_setup!("parsed shape {shape:?}"); + debug_setup!( + "establishing series writer for {:?} {:?} {:?}", + channel, + scalar_type, + shape + ); + let mut writer = + SeriesWriter::establish(worker_tx, backend, channel, scalar_type.clone(), shape.clone(), stnow).await?; + debug_setup!("series writer established"); let mut iqdqs = InsertDeques::new(); let mut iqtx = rres.iqtx.clone(); - // iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?; - // let deque = &mut iqdqs.st_rf3_rx; - let mut frames = FramedBytesStream::new(body.into_data_stream().map_err(|_| streams::framed_bytes::Error::Logic)); - while let Some(frame) = frames.try_next().await? { - info!("got frame len {}", frame.len()); - let evs: EventsDim0 = ciborium::de::from_reader(Cursor::new(frame)).map_err(|_| Error::Decode)?; - info!("see events {:?}", evs); + loop { + let x = timeout(Duration::from_millis(2000), frames.try_next()).await; + let x = match x { + Ok(x) => x, + Err(_) => { + tick_writers(&mut writer, &mut iqdqs)?; + continue; + } + }; + let frame = match x? { + Some(x) => x, + None => { + trace!("input stream done"); + break; + } + }; + trace_input!("got frame len {}", frame.len()); let deque = &mut iqdqs.st_rf3_rx; - for (i, (&ts, &val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() { - info!("ev {:6} {:20} {:20}", i, ts, val); - let val = DataValue::Scalar(ScalarValue::I16(val)); - writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts), val, deque)?; + match &shape { + Shape::Scalar => match &scalar_type { + ScalarType::U8 => { + evpush_dim0::(&frame, deque, &mut writer, |x| { + DataValue::Scalar(ScalarValue::U8(x as _)) + })?; + } + ScalarType::U16 => { + evpush_dim0::(&frame, deque, &mut writer, |x| { + DataValue::Scalar(ScalarValue::U16(x as _)) + })?; + } + ScalarType::U32 => { + evpush_dim0::(&frame, deque, &mut writer, |x| { + DataValue::Scalar(ScalarValue::U32(x as _)) + })?; + } + ScalarType::U64 => { + evpush_dim0::(&frame, deque, &mut writer, |x| { + DataValue::Scalar(ScalarValue::U64(x as _)) + })?; + } + ScalarType::I8 => { + evpush_dim0::(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::I8(x)))?; + } + ScalarType::I16 => { + evpush_dim0::(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::I16(x)))?; + } + ScalarType::I32 => { + evpush_dim0::(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::I32(x)))?; + } + ScalarType::I64 => { + evpush_dim0::(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::I64(x)))?; + } + ScalarType::F32 => { + evpush_dim0::(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::F32(x)))?; + } + ScalarType::F64 => { + evpush_dim0::(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::F64(x)))?; + } + ScalarType::BOOL => return Err(Error::NotSupported), + ScalarType::STRING => { + evpush_dim0::(&frame, deque, &mut writer, |x| { + DataValue::Scalar(ScalarValue::String(x)) + })?; + } + ScalarType::Enum => return Err(Error::NotSupported), + ScalarType::ChannelStatus => return Err(Error::NotSupported), + }, + Shape::Wave(_) => match &scalar_type { + ScalarType::U8 => { + evpush_dim1::(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::U8(x)))?; + } + ScalarType::U16 => { + evpush_dim1::(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::U16(x)))?; + } + ScalarType::U32 => { + evpush_dim1::(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::U32(x)))?; + } + ScalarType::U64 => { + evpush_dim1::(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::U64(x)))?; + } + ScalarType::I8 => { + evpush_dim1::(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::I8(x)))?; + } + ScalarType::I16 => { + evpush_dim1::(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::I16(x)))?; + } + ScalarType::I32 => { + evpush_dim1::(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::I32(x)))?; + } + ScalarType::I64 => { + evpush_dim1::(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::I64(x)))?; + } + ScalarType::F32 => { + evpush_dim1::(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::F32(x)))?; + } + ScalarType::F64 => { + evpush_dim1::(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::F64(x)))?; + } + ScalarType::BOOL => return Err(Error::NotSupported), + ScalarType::STRING => return Err(Error::NotSupported), + ScalarType::Enum => return Err(Error::NotSupported), + ScalarType::ChannelStatus => return Err(Error::NotSupported), + }, + Shape::Image(_, _) => return Err(Error::NotSupported), } - iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?; + trace_queues!("frame send_all begin {} {}", iqdqs.summary(), iqtx.summary()); + iqtx.send_all(&mut iqdqs).await?; + trace_queues!("frame send_all done {} {}", iqdqs.summary(), iqtx.summary()); + tick_writers(&mut writer, &mut iqdqs)?; + trace_queues!("frame tick_writers done {} {}", iqdqs.summary(), iqtx.summary()); } - let deque = &mut iqdqs.st_rf3_rx; - finish_writers(vec![&mut writer], deque)?; - iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?; + trace_queues!("after send_all begin {} {}", iqdqs.summary(), iqtx.summary()); + iqtx.send_all(&mut iqdqs).await?; + trace_queues!("after send_all done {} {}", iqdqs.summary(), iqtx.summary()); + finish_writers(&mut writer, &mut iqdqs)?; + trace_queues!("after finish_writers done {} {}", iqdqs.summary(), iqtx.summary()); - let ret = Json(serde_json::json!({ - "result": true, - })); + let ret = Json(serde_json::json!({})); Ok(ret) } -fn tick_writers(sws: Vec<&mut SeriesWriter>, deque: &mut VecDeque) -> Result<(), Error> { - for sw in sws { - sw.tick(deque)?; +fn evpush_dim0( + frame: &Bytes, + deque: &mut VecDeque, + writer: &mut SeriesWriter, + f1: F1, +) -> Result<(), Error> +where + T: for<'a> Deserialize<'a> + fmt::Debug + Clone, + F1: Fn(T) -> DataValue, +{ + let evs: EventsDim0NoPulse = ciborium::de::from_reader(Cursor::new(frame)) + .map_err(|e| { + error!("cbor decode error {e}"); + }) + .map_err(|_| Error::Decode)?; + let evs: EventsDim0 = evs.into(); + trace_input!("see events {:?}", evs); + for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() { + let val = val.clone(); + trace_input!("ev {:6} {:20} {:20?}", i, ts, val); + let val = f1(val); + writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts), val, deque)?; } Ok(()) } -fn finish_writers(sws: Vec<&mut SeriesWriter>, deque: &mut VecDeque) -> Result<(), Error> { - for sw in sws { - sw.tick(deque)?; +fn evpush_dim1( + frame: &Bytes, + deque: &mut VecDeque, + writer: &mut SeriesWriter, + f1: F1, +) -> Result<(), Error> +where + T: for<'a> Deserialize<'a> + fmt::Debug + Clone, + F1: Fn(Vec) -> DataValue, +{ + let evs: EventsDim1NoPulse = ciborium::de::from_reader(Cursor::new(frame)) + .map_err(|e| { + error!("cbor decode error {e}"); + }) + .map_err(|_| Error::Decode)?; + let evs: EventsDim1 = evs.into(); + trace_input!("see events {:?}", evs); + for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() { + let val = val.clone(); + trace_input!("ev {:6} {:20} {:20?}", i, ts, val); + let val = f1(val); + writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts), val, deque)?; } Ok(()) } + +fn tick_writers(writer: &mut SeriesWriter, deque: &mut InsertDeques) -> Result<(), Error> { + writer.tick(&mut deque.st_rf3_rx)?; + Ok(()) +} + +fn finish_writers(writer: &mut SeriesWriter, deque: &mut InsertDeques) -> Result<(), Error> { + writer.tick(&mut deque.st_rf3_rx)?; + Ok(()) +} diff --git a/postingest.md b/postingest.md new file mode 100644 index 0000000..47de745 --- /dev/null +++ b/postingest.md @@ -0,0 +1,46 @@ +# HTTP POST Ingest + +Example: + +``` +Method: POST +Url: http://sf-ingest-mg-01.psi.ch:9009/daqingest/ingest/v1?channelName=MY:DEVICE:POS&shape=[]&scalarType=f32 +Headers: Content-Type: application/cbor-framed +``` + +The body must be a stream of length delimited frames, where the payload of each frame is +a CBOR object. + +The http body of the response then looks like this: +```txt +[CBOR-frame] +[CBOR-frame] +[CBOR-frame] +... etc +``` + +where each `[CBOR-frame]` looks like: +```txt +[length N of the following CBOR object: uint32 little-endian] +[reserved: 12 bytes of zero-padding] +[CBOR object: N bytes] +[padding: P zero-bytes, 0 <= P <= 7, such that (N + P) mod 8 = 0] +``` + +Each CBOR object must contain the timestamps (integer nanoseconds) and the values (depends on type), e.g: +```json +{ + "tss": [1712100002000000000, 1712100003000000000, 1712100004000000000], + "values": [5.6, 7.8, 8.1] +} +``` + +## Shape of data + +The `shape` URL parameter indicates whether the data is scalar or 1-dimensional, +for example `shape=[]` indicates a scalar and `shape=[4096]` indicates an array +with 4096 elements. + +The shape nowadays only distinguishes between scalar and 1-dimensional, but the actual length of +the array dimension may vary from event to event and is therefore not meaningful. +Still, it doesn't hurt to pass the "typical" size of array data as parameter. diff --git a/readme.md b/readme.md index 6dd4732..d64a9c9 100644 --- a/readme.md +++ b/readme.md @@ -21,11 +21,10 @@ to the most basic linux system libraries. ```yml # Address to bind the HTTP API to, for runtime control and Prometheus metrics scrape: -api_bind: "0.0.0.0:3011" -# The hostname to send to channel access peers as our own hostname: -local_epics_hostname: sf-daqsync-02.psi.ch +api_bind: 0.0.0.0:3011 # The backend name to use for the channels handled by this daqingest instance: backend: scylla +channels: directory-name-with-channel-config-files # Addresses to use for channel access search: search: - "172.26.0.255" @@ -35,19 +34,30 @@ search: postgresql: host: postgresql-host port: 5432 - user: database-username + user: the-username pass: the-password - name: the-database-name -scylla: + name: the-database +scylla_st: + keyspace: backend_st hosts: - - "sf-nube-11:19042" - - "sf-nube-12:19042" - - "sf-nube-13:19042" - - "sf-nube-14:19042" - keyspace: ks1 -channels: - - "SOME-CHANNEL:1" - - "OTHER-CHANNEL:2" + - sf-nube-11:19042 + - sf-nube-12:19042 + - sf-nube-13:19042 + - sf-nube-14:19042 +scylla_mt: + keyspace: backend_mt + hosts: + - sf-nube-11:19042 + - sf-nube-12:19042 + - sf-nube-13:19042 + - sf-nube-14:19042 +scylla_lt: + keyspace: backend_lt + hosts: + - sf-nube-11:19042 + - sf-nube-12:19042 + - sf-nube-13:19042 + - sf-nube-14:19042 ``` @@ -61,3 +71,8 @@ as configured by the `api_bind` parameter. ```txt http:///daqingest/channel/state?name=[...] ``` + + +# HTTP POST ingest + +It is possible to [ingest](postingest.md) data via the `api_bind` socket address. diff --git a/scywr/src/insertqueues.rs b/scywr/src/insertqueues.rs index e42803e..e6e3c9c 100644 --- a/scywr/src/insertqueues.rs +++ b/scywr/src/insertqueues.rs @@ -2,9 +2,11 @@ use crate::iteminsertqueue::QueryItem; use crate::senderpolling::SenderPolling; use async_channel::Receiver; use async_channel::Sender; +use core::fmt; use err::thiserror; use err::ThisError; use netpod::log::*; +use netpod::ttl::RetentionTime; use pin_project::pin_project; use std::collections::VecDeque; use std::pin::Pin; @@ -12,6 +14,8 @@ use std::pin::Pin; #[derive(Debug, ThisError)] pub enum Error { QueuePush, + #[error("ChannelSend({0}, {1})")] + ChannelSend(RetentionTime, u8), } #[derive(Clone)] @@ -24,22 +28,72 @@ pub struct InsertQueuesTx { impl InsertQueuesTx { /// Send all accumulated batches - pub async fn send_all(&mut self, iqdqs: &mut InsertDeques) -> Result<(), ()> { + pub async fn send_all(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { // Send each buffer down the corresponding channel - let item = core::mem::replace(&mut iqdqs.st_rf1_rx, VecDeque::new()); - self.st_rf1_tx.send(item).await.map_err(|_| ())?; - let item = core::mem::replace(&mut iqdqs.st_rf3_rx, VecDeque::new()); - self.st_rf3_tx.send(item).await.map_err(|_| ())?; - let item = core::mem::replace(&mut iqdqs.mt_rf3_rx, VecDeque::new()); - self.mt_rf3_tx.send(item).await.map_err(|_| ())?; - let item = core::mem::replace(&mut iqdqs.lt_rf3_rx, VecDeque::new()); - self.lt_rf3_tx.send(item).await.map_err(|_| ())?; + if false { + let item = core::mem::replace(&mut iqdqs.st_rf1_rx, VecDeque::new()); + self.st_rf1_tx + .send(item) + .await + .map_err(|_| Error::ChannelSend(RetentionTime::Short, 1))?; + } + { + let item = core::mem::replace(&mut iqdqs.st_rf3_rx, VecDeque::new()); + self.st_rf3_tx + .send(item) + .await + .map_err(|_| Error::ChannelSend(RetentionTime::Short, 3))?; + } + { + let item = core::mem::replace(&mut iqdqs.mt_rf3_rx, VecDeque::new()); + self.mt_rf3_tx + .send(item) + .await + .map_err(|_| Error::ChannelSend(RetentionTime::Medium, 3))?; + } + { + let item = core::mem::replace(&mut iqdqs.lt_rf3_rx, VecDeque::new()); + self.lt_rf3_tx + .send(item) + .await + .map_err(|_| Error::ChannelSend(RetentionTime::Long, 3))?; + } Ok(()) } pub fn clone2(&self) -> Self { self.clone() } + + pub fn summary(&self) -> InsertQueuesTxSummary { + InsertQueuesTxSummary { obj: self } + } +} + +pub struct InsertQueuesTxSummary<'a> { + obj: &'a InsertQueuesTx, +} + +impl<'a> fmt::Display for InsertQueuesTxSummary<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let obj = self.obj; + write!( + fmt, + "InsertQueuesTx {{ st_rf1_tx: {} {} {}, st_rf3_tx: {} {} {}, mt_rf3_tx: {} {} {}, lt_rf3_tx: {} {} {} }}", + obj.st_rf1_tx.is_closed(), + obj.st_rf1_tx.is_full(), + obj.st_rf1_tx.len(), + obj.st_rf3_tx.is_closed(), + obj.st_rf3_tx.is_full(), + obj.st_rf3_tx.len(), + obj.mt_rf3_tx.is_closed(), + obj.mt_rf3_tx.is_full(), + obj.mt_rf3_tx.len(), + obj.lt_rf3_tx.is_closed(), + obj.lt_rf3_tx.is_full(), + obj.lt_rf3_tx.len(), + ) + } } #[derive(Clone)] @@ -72,7 +126,6 @@ impl InsertDeques { self.st_rf1_rx.len() + self.st_rf3_rx.len() + self.mt_rf3_rx.len() + self.lt_rf3_rx.len() } - /// pub fn clear(&mut self) { self.st_rf1_rx.clear(); self.st_rf3_rx.clear(); @@ -80,14 +133,8 @@ impl InsertDeques { self.lt_rf3_rx.clear(); } - pub fn log_summary(&self) { - let summ = InsertDequesSummary { - st_rf1_len: self.st_rf1_rx.len(), - st_rf3_len: self.st_rf3_rx.len(), - mt_rf3_len: self.mt_rf3_rx.len(), - lt_rf3_len: self.lt_rf3_rx.len(), - }; - info!("{summ:?}"); + pub fn summary(&self) -> InsertDequesSummary { + InsertDequesSummary { obj: self } } // Should be used only for connection and channel status items. @@ -98,13 +145,22 @@ impl InsertDeques { } } -#[derive(Debug)] -#[allow(unused)] -struct InsertDequesSummary { - st_rf1_len: usize, - st_rf3_len: usize, - mt_rf3_len: usize, - lt_rf3_len: usize, +pub struct InsertDequesSummary<'a> { + obj: &'a InsertDeques, +} + +impl<'a> fmt::Display for InsertDequesSummary<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let obj = self.obj; + write!( + fmt, + "InsertDeques {{ st_rf1_len: {}, st_rf3_len: {}, mt_rf3_len: {}, lt_rf3_len: {} }}", + obj.st_rf1_rx.len(), + obj.st_rf3_rx.len(), + obj.mt_rf3_rx.len(), + obj.lt_rf3_rx.len() + ) + } } #[pin_project] @@ -156,22 +212,29 @@ impl InsertSenderPolling { unsafe { self.map_unchecked_mut(|x| &mut x.st_rf1_sp) } } - pub fn log_summary(&self) { - let summ = InsertSenderPollingSummary { - st_rf1_idle: self.st_rf1_sp.is_idle(), - st_rf3_idle: self.st_rf3_sp.is_idle(), - mt_rf3_idle: self.mt_rf3_sp.is_idle(), - lt_rf3_idle: self.lt_rf3_sp.is_idle(), - }; - info!("{summ:?}"); + pub fn summary(&self) -> InsertSenderPollingSummary { + InsertSenderPollingSummary { obj: self } } } -#[derive(Debug)] -#[allow(unused)] -struct InsertSenderPollingSummary { - st_rf1_idle: bool, - st_rf3_idle: bool, - mt_rf3_idle: bool, - lt_rf3_idle: bool, +pub struct InsertSenderPollingSummary<'a> { + obj: &'a InsertSenderPolling, +} + +impl<'a> fmt::Display for InsertSenderPollingSummary<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let obj = self.obj; + write!( + fmt, + "InsertSenderPolling {{ st_rf1_idle_len: {:?} {:?}, st_rf3_idle_len: {:?} {:?}, mt_rf3_idle_len: {:?} {:?}, lt_rf3_idle_len: {:?} {:?} }}", + obj.st_rf1_sp.is_idle(), + obj.st_rf1_sp.len(), + obj.st_rf3_sp.is_idle(), + obj.st_rf3_sp.len(), + obj.mt_rf3_sp.is_idle(), + obj.mt_rf3_sp.len(), + obj.lt_rf3_sp.is_idle(), + obj.lt_rf3_sp.len(), + ) + } } diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 5620a1f..f895b5c 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -3,7 +3,6 @@ use crate::iteminsertqueue::insert_channel_status; use crate::iteminsertqueue::insert_channel_status_fut; use crate::iteminsertqueue::insert_connection_status; use crate::iteminsertqueue::insert_connection_status_fut; -use crate::iteminsertqueue::insert_item; use crate::iteminsertqueue::insert_item_fut; use crate::iteminsertqueue::insert_msp_fut; use crate::iteminsertqueue::Accounting; @@ -35,7 +34,7 @@ use tokio::task::JoinHandle; #[allow(unused)] macro_rules! trace2 { ($($arg:tt)*) => { - if true { + if false { trace!($($arg)*); } }; @@ -44,7 +43,7 @@ macro_rules! trace2 { #[allow(unused)] macro_rules! trace3 { ($($arg:tt)*) => { - if true { + if false { trace!($($arg)*); } }; @@ -53,7 +52,16 @@ macro_rules! trace3 { #[allow(unused)] macro_rules! trace_item_execute { ($($arg:tt)*) => { - if true { + if false { + trace!($($arg)*); + } + }; +} + +#[allow(unused)] +macro_rules! debug_setup { + ($($arg:tt)*) => { + if false { debug!($($arg)*); } }; @@ -181,86 +189,6 @@ pub async fn spawn_scylla_insert_workers_dummy( Ok(jhs) } -#[allow(unused)] -async fn worker_unused( - worker_ix: usize, - item_inp: Receiver, - insert_worker_opts: Arc, - data_store: Arc, - stats: Arc, -) -> Result<(), Error> { - stats.worker_start().inc(); - insert_worker_opts - .insert_workers_running - .fetch_add(1, atomic::Ordering::AcqRel); - let backoff_0 = Duration::from_millis(10); - let mut backoff = backoff_0.clone(); - let mut i1 = 0; - loop { - let item = if let Ok(item) = item_inp.recv().await { - stats.item_recv.inc(); - item - } else { - break; - }; - match item { - QueryItem::ConnectionStatus(item) => match insert_connection_status(item, &data_store).await { - Ok(_) => { - stats.inserted_connection_status().inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - }, - QueryItem::ChannelStatus(item) => match insert_channel_status(item, &data_store).await { - Ok(_) => { - stats.inserted_channel_status().inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - }, - QueryItem::Insert(item) => { - let tsnow = TsMs::from_system_time(SystemTime::now()); - let item_ts_net = item.ts_net.clone(); - let dt = tsnow.to_u64().saturating_sub(item_ts_net.to_u64()) as u32; - stats.item_lat_net_worker().ingest(dt); - let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire); - let do_insert = i1 % 1000 < insert_frac; - match insert_item(item, &data_store, do_insert, &stats).await { - Ok(_) => { - stats.inserted_values().inc(); - let tsnow = TsMs::from_system_time(SystemTime::now()); - let dt = tsnow.to_u64().saturating_sub(item_ts_net.to_u64()) as u32; - stats.item_lat_net_store().ingest(dt); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - } - i1 += 1; - } - QueryItem::TimeBinSimpleF32(item) => { - info!("have time bin patch to insert: {item:?}"); - return Err(Error::with_msg_no_trace("TODO insert item old path")); - } - QueryItem::Accounting(..) => {} - } - } - stats.worker_finish().inc(); - insert_worker_opts - .insert_workers_running - .fetch_sub(1, atomic::Ordering::AcqRel); - trace2!("insert worker {worker_ix} done"); - Ok(()) -} - async fn worker_streamed( worker_ix: usize, concurrency: usize, @@ -269,7 +197,7 @@ async fn worker_streamed( data_store: Option>, stats: Arc, ) -> Result<(), Error> { - trace!("worker_streamed begin"); + debug_setup!("worker_streamed begin"); stats.worker_start().inc(); insert_worker_opts .insert_workers_running @@ -290,7 +218,9 @@ async fn worker_streamed( // }) .buffer_unordered(concurrency); let mut stream = Box::pin(stream); + debug_setup!("waiting for item"); while let Some(item) = stream.next().await { + trace_item_execute!("see item"); match item { Ok(_) => { stats.inserted_values().inc(); @@ -321,7 +251,7 @@ async fn worker_streamed( insert_worker_opts .insert_workers_running .fetch_sub(1, atomic::Ordering::AcqRel); - trace2!("insert worker {worker_ix} done"); + debug_setup!("insert worker {worker_ix} done"); Ok(()) } @@ -386,7 +316,7 @@ fn inspect_items( trace_item_execute!("execute {worker_name} TimeBinSimpleF32"); } QueryItem::Accounting(x) => { - if x.series.id() & 0x7f == 77 { + if x.series.id() & 0x7f == 200 { debug!("execute {worker_name} Accounting {item:?}"); } else { trace_item_execute!("execute {worker_name} Accounting {item:?}"); diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 7423b4f..c632b2c 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -48,6 +48,10 @@ pub enum Error { #[derive(Clone, Debug, PartialEq)] pub enum ScalarValue { + U8(u8), + U16(u16), + U32(u32), + U64(u64), I8(i8), I16(i16), I32(i32), @@ -62,6 +66,10 @@ pub enum ScalarValue { impl ScalarValue { pub fn byte_size(&self) -> u32 { match self { + ScalarValue::U8(_) => 1, + ScalarValue::U16(_) => 1, + ScalarValue::U32(_) => 1, + ScalarValue::U64(_) => 1, ScalarValue::I8(_) => 1, ScalarValue::I16(_) => 2, ScalarValue::I32(_) => 4, @@ -76,6 +84,10 @@ impl ScalarValue { pub fn string_short(&self) -> String { match self { + ScalarValue::U8(x) => x.to_string(), + ScalarValue::U16(x) => x.to_string(), + ScalarValue::U32(x) => x.to_string(), + ScalarValue::U64(x) => x.to_string(), ScalarValue::I8(x) => x.to_string(), ScalarValue::I16(x) => x.to_string(), ScalarValue::I32(x) => x.to_string(), @@ -91,9 +103,14 @@ impl ScalarValue { #[derive(Clone, Debug, PartialEq)] pub enum ArrayValue { + U8(Vec), + U16(Vec), + U32(Vec), + U64(Vec), I8(Vec), I16(Vec), I32(Vec), + I64(Vec), F32(Vec), F64(Vec), Bool(Vec), @@ -103,9 +120,14 @@ impl ArrayValue { pub fn len(&self) -> usize { use ArrayValue::*; match self { + U8(a) => a.len(), + U16(a) => a.len(), + U32(a) => a.len(), + U64(a) => a.len(), I8(a) => a.len(), I16(a) => a.len(), I32(a) => a.len(), + I64(a) => a.len(), F32(a) => a.len(), F64(a) => a.len(), Bool(a) => a.len(), @@ -115,9 +137,14 @@ impl ArrayValue { pub fn byte_size(&self) -> u32 { use ArrayValue::*; match self { + U8(a) => 1 * a.len() as u32, + U16(a) => 2 * a.len() as u32, + U32(a) => 4 * a.len() as u32, + U64(a) => 8 * a.len() as u32, I8(a) => 1 * a.len() as u32, I16(a) => 2 * a.len() as u32, I32(a) => 4 * a.len() as u32, + I64(a) => 8 * a.len() as u32, F32(a) => 4 * a.len() as u32, F64(a) => 8 * a.len() as u32, Bool(a) => 1 * a.len() as u32, @@ -127,6 +154,50 @@ impl ArrayValue { pub fn to_binary_blob(&self) -> Vec { use ArrayValue::*; match self { + U8(a) => { + let n = self.byte_size(); + let mut blob = Vec::with_capacity(32 + n as usize); + for _ in 0..4 { + blob.put_u64_le(0); + } + for &x in a { + blob.put_u8(x); + } + blob + } + U16(a) => { + let n = self.byte_size(); + let mut blob = Vec::with_capacity(32 + n as usize); + for _ in 0..4 { + blob.put_u64_le(0); + } + for &x in a { + blob.put_u16_le(x); + } + blob + } + U32(a) => { + let n = self.byte_size(); + let mut blob = Vec::with_capacity(32 + n as usize); + for _ in 0..4 { + blob.put_u64_le(0); + } + for &x in a { + blob.put_u32_le(x); + } + blob + } + U64(a) => { + let n = self.byte_size(); + let mut blob = Vec::with_capacity(32 + n as usize); + for _ in 0..4 { + blob.put_u64_le(0); + } + for &x in a { + blob.put_u64_le(x); + } + blob + } I8(a) => { let n = self.byte_size(); let mut blob = Vec::with_capacity(32 + n as usize); @@ -160,6 +231,17 @@ impl ArrayValue { } blob } + I64(a) => { + let n = self.byte_size(); + let mut blob = Vec::with_capacity(32 + n as usize); + for _ in 0..4 { + blob.put_u64_le(0); + } + for &x in a { + blob.put_i64_le(x); + } + blob + } F32(a) => { let n = self.byte_size(); let mut blob = Vec::with_capacity(32 + n as usize); @@ -200,9 +282,14 @@ impl ArrayValue { pub fn string_short(&self) -> String { use ArrayValue::*; match self { + U8(x) => format!("{}", x.get(0).map_or(0, |x| *x)), + U16(x) => format!("{}", x.get(0).map_or(0, |x| *x)), + U32(x) => format!("{}", x.get(0).map_or(0, |x| *x)), + U64(x) => format!("{}", x.get(0).map_or(0, |x| *x)), I8(x) => format!("{}", x.get(0).map_or(0, |x| *x)), I16(x) => format!("{}", x.get(0).map_or(0, |x| *x)), I32(x) => format!("{}", x.get(0).map_or(0, |x| *x)), + I64(x) => format!("{}", x.get(0).map_or(0, |x| *x)), F32(x) => format!("{}", x.get(0).map_or(0., |x| *x)), F64(x) => format!("{}", x.get(0).map_or(0., |x| *x)), Bool(x) => format!("{}", x.get(0).map_or(false, |x| *x)), @@ -227,6 +314,10 @@ impl DataValue { pub fn scalar_type(&self) -> ScalarType { match self { DataValue::Scalar(x) => match x { + ScalarValue::U8(_) => ScalarType::U8, + ScalarValue::U16(_) => ScalarType::U16, + ScalarValue::U32(_) => ScalarType::U32, + ScalarValue::U64(_) => ScalarType::U64, ScalarValue::I8(_) => ScalarType::I8, ScalarValue::I16(_) => ScalarType::I16, ScalarValue::I32(_) => ScalarType::I32, @@ -238,9 +329,14 @@ impl DataValue { ScalarValue::Bool(_) => ScalarType::BOOL, }, DataValue::Array(x) => match x { + ArrayValue::U8(_) => ScalarType::U8, + ArrayValue::U16(_) => ScalarType::U16, + ArrayValue::U32(_) => ScalarType::U32, + ArrayValue::U64(_) => ScalarType::U64, ArrayValue::I8(_) => ScalarType::I8, ArrayValue::I16(_) => ScalarType::I16, ArrayValue::I32(_) => ScalarType::I32, + ArrayValue::I64(_) => ScalarType::I64, ArrayValue::F32(_) => ScalarType::F32, ArrayValue::F64(_) => ScalarType::F64, ArrayValue::Bool(_) => ScalarType::BOOL, @@ -647,143 +743,6 @@ impl Future for InsertFut { } } -async fn insert_scalar_gen( - par: InsParCom, - val: ST, - qu: &PreparedStatement, - data_store: &DataStore, -) -> Result<(), Error> -where - ST: Value + SerializeCql, -{ - let params = ( - par.series.to_i64(), - par.ts_msp.to_i64(), - par.ts_lsp.to_i64(), - par.ts_alt_1.ns() as i64, - par.pulse as i64, - val, - ); - if par.do_insert { - let y = data_store.scy.execute(qu, params).await; - match y { - Ok(_) => Ok(()), - Err(e) => match e { - QueryError::TimeoutError => Err(Error::DbTimeout), - // TODO use `msg` - QueryError::DbError(e, _msg) => match e { - DbError::Overloaded => Err(Error::DbOverload), - _ => Err(e.into()), - }, - _ => Err(e.into()), - }, - } - } else { - Ok(()) - } -} - -async fn insert_array_gen( - par: InsParCom, - val: Vec, - qu: &PreparedStatement, - data_store: &DataStore, -) -> Result<(), Error> -where - ST: Value + SerializeCql, -{ - if par.do_insert { - let params = ( - par.series.to_i64(), - par.ts_msp.to_i64(), - par.ts_lsp.to_i64(), - par.ts_alt_1.ns() as i64, - par.pulse as i64, - val, - ); - let y = data_store.scy.execute(qu, params).await; - match y { - Ok(_) => Ok(()), - Err(e) => match e { - QueryError::TimeoutError => Err(Error::DbTimeout), - // TODO use `msg` - QueryError::DbError(e, _msg) => match e { - DbError::Overloaded => Err(Error::DbOverload), - _ => Err(e.into()), - }, - _ => Err(e.into()), - }, - } - } else { - Ok(()) - } -} - -// TODO currently not in use, anything to merge? -pub async fn insert_item( - item: InsertItem, - data_store: &DataStore, - do_insert: bool, - stats: &Arc, -) -> Result<(), Error> { - if item.msp_bump { - let params = (item.series.id() as i64, item.ts_msp.to_i64()); - data_store.scy.execute(&data_store.qu_insert_ts_msp, params).await?; - stats.inserts_msp().inc(); - } - use DataValue::*; - match item.val { - Scalar(val) => { - let par = InsParCom { - series: item.series, - ts_msp: item.ts_msp, - ts_lsp: item.ts_lsp, - ts_net: item.ts_net, - ts_alt_1: item.ts_alt_1, - pulse: item.pulse, - do_insert, - stats: stats.clone(), - }; - use ScalarValue::*; - match val { - I8(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i8, &data_store).await?, - I16(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i16, &data_store).await?, - Enum(a, b) => insert_scalar_gen(par, a, &data_store.qu_insert_scalar_i16, &data_store).await?, - I32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i32, &data_store).await?, - I64(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i64, &data_store).await?, - F32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f32, &data_store).await?, - F64(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f64, &data_store).await?, - String(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_string, &data_store).await?, - Bool(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_bool, &data_store).await?, - } - } - Array(val) => { - let par = InsParCom { - series: item.series, - ts_msp: item.ts_msp, - ts_lsp: item.ts_lsp, - ts_net: item.ts_net, - ts_alt_1: item.ts_alt_1, - pulse: item.pulse, - do_insert, - stats: stats.clone(), - }; - err::todo(); - use ArrayValue::*; - match val { - I8(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i8, &data_store).await?, - I16(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i16, &data_store).await?, - I32(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i32, &data_store).await?, - F32(val) => insert_array_gen(par, val, &data_store.qu_insert_array_f32, &data_store).await?, - F64(val) => insert_array_gen(par, val, &data_store.qu_insert_array_f64, &data_store).await?, - Bool(val) => insert_array_gen(par, val, &data_store.qu_insert_array_bool, &data_store).await?, - } - } - } - stats.inserts_value().inc(); - Ok(()) -} - pub fn insert_msp_fut( series: SeriesId, ts_msp: TsMs, @@ -819,6 +778,10 @@ pub fn insert_item_fut( }; use ScalarValue::*; match val { + U8(val) => insert_scalar_gen_fut(par, val as i8, data_store.qu_insert_scalar_u8.clone(), scy), + U16(val) => insert_scalar_gen_fut(par, val as i16, data_store.qu_insert_scalar_u16.clone(), scy), + U32(val) => insert_scalar_gen_fut(par, val as i32, data_store.qu_insert_scalar_u32.clone(), scy), + U64(val) => insert_scalar_gen_fut(par, val as i64, data_store.qu_insert_scalar_u64.clone(), scy), I8(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i8.clone(), scy), I16(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i16.clone(), scy), I32(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i32.clone(), scy), @@ -845,9 +808,14 @@ pub fn insert_item_fut( let blob = val.to_binary_blob(); #[allow(unused)] match val { + U8(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_u8.clone(), scy), + U16(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_u16.clone(), scy), + U32(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_u32.clone(), scy), + U64(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_u64.clone(), scy), I8(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_i8.clone(), scy), I16(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_i16.clone(), scy), I32(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_i32.clone(), scy), + I64(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_i64.clone(), scy), F32(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_f32.clone(), scy), F64(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_f64.clone(), scy), Bool(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_bool.clone(), scy), diff --git a/scywr/src/store.rs b/scywr/src/store.rs index 42bd9a3..7646059 100644 --- a/scywr/src/store.rs +++ b/scywr/src/store.rs @@ -20,6 +20,10 @@ pub struct DataStore { pub rett: RetentionTime, pub scy: Arc, pub qu_insert_ts_msp: Arc, + pub qu_insert_scalar_u8: Arc, + pub qu_insert_scalar_u16: Arc, + pub qu_insert_scalar_u32: Arc, + pub qu_insert_scalar_u64: Arc, pub qu_insert_scalar_i8: Arc, pub qu_insert_scalar_i16: Arc, pub qu_insert_scalar_i32: Arc, @@ -28,6 +32,10 @@ pub struct DataStore { pub qu_insert_scalar_f64: Arc, pub qu_insert_scalar_bool: Arc, pub qu_insert_scalar_string: Arc, + pub qu_insert_array_u8: Arc, + pub qu_insert_array_u16: Arc, + pub qu_insert_array_u32: Arc, + pub qu_insert_array_u64: Arc, pub qu_insert_array_i8: Arc, pub qu_insert_array_i16: Arc, pub qu_insert_array_i32: Arc, @@ -100,6 +108,10 @@ impl DataStore { .await?; let qu_insert_ts_msp = Arc::new(q); + let qu_insert_scalar_u8 = prep_qu_ins_a!("events_scalar_u8", rett, scy); + let qu_insert_scalar_u16 = prep_qu_ins_a!("events_scalar_u16", rett, scy); + let qu_insert_scalar_u32 = prep_qu_ins_a!("events_scalar_u32", rett, scy); + let qu_insert_scalar_u64 = prep_qu_ins_a!("events_scalar_u64", rett, scy); let qu_insert_scalar_i8 = prep_qu_ins_a!("events_scalar_i8", rett, scy); let qu_insert_scalar_i16 = prep_qu_ins_a!("events_scalar_i16", rett, scy); let qu_insert_scalar_i32 = prep_qu_ins_a!("events_scalar_i32", rett, scy); @@ -109,7 +121,10 @@ impl DataStore { let qu_insert_scalar_bool = prep_qu_ins_a!("events_scalar_bool", rett, scy); let qu_insert_scalar_string = prep_qu_ins_a!("events_scalar_string", rett, scy); - // array + let qu_insert_array_u8 = prep_qu_ins_b!("events_array_u8", rett, scy); + let qu_insert_array_u16 = prep_qu_ins_b!("events_array_u16", rett, scy); + let qu_insert_array_u32 = prep_qu_ins_b!("events_array_u32", rett, scy); + let qu_insert_array_u64 = prep_qu_ins_b!("events_array_u64", rett, scy); let qu_insert_array_i8 = prep_qu_ins_b!("events_array_i8", rett, scy); let qu_insert_array_i16 = prep_qu_ins_b!("events_array_i16", rett, scy); let qu_insert_array_i32 = prep_qu_ins_b!("events_array_i32", rett, scy); @@ -172,6 +187,10 @@ impl DataStore { rett, scy, qu_insert_ts_msp, + qu_insert_scalar_u8, + qu_insert_scalar_u16, + qu_insert_scalar_u32, + qu_insert_scalar_u64, qu_insert_scalar_i8, qu_insert_scalar_i16, qu_insert_scalar_i32, @@ -180,6 +199,10 @@ impl DataStore { qu_insert_scalar_f64, qu_insert_scalar_bool, qu_insert_scalar_string, + qu_insert_array_u8, + qu_insert_array_u16, + qu_insert_array_u32, + qu_insert_array_u64, qu_insert_array_i8, qu_insert_array_i16, qu_insert_array_i32,