Add remaining types, add docs

This commit is contained in:
Dominik Werder
2024-06-20 15:46:12 +02:00
parent 995defaff3
commit 1e0505a3f9
9 changed files with 579 additions and 319 deletions

View File

@@ -2595,8 +2595,8 @@ impl CaConn {
}
fn log_queues_summary(&self) {
self.iqdqs.log_summary();
self.iqsp.log_summary();
trace!("{}", self.iqdqs.summary());
trace!("{}", self.iqsp.summary());
}
}

View File

@@ -16,6 +16,7 @@ use async_channel::Sender;
use async_channel::WeakSender;
use axum::extract::Query;
use axum::http;
use axum::http::HeaderMap;
use axum::response::IntoResponse;
use axum::response::Response;
use bytes::Bytes;
@@ -388,9 +389,11 @@ fn make_routes(
"/v1",
post({
let rres = rres.clone();
move |(params, body): (Query<HashMap<String, String>>, axum::body::Body)| {
ingest::post_v01((params, body), rres)
}
move |(headers, params, body): (
HeaderMap,
Query<HashMap<String, String>>,
axum::body::Body,
)| { ingest::post_v01((headers, params, body), rres) }
}),
),
),

View File

@@ -1,105 +1,317 @@
use super::RoutesResources;
use axum::extract::FromRequest;
use axum::extract::Query;
use axum::http::HeaderMap;
use axum::Json;
use bytes::Bytes;
use core::fmt;
use err::thiserror;
use err::ThisError;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim0::EventsDim0NoPulse;
use items_2::eventsdim1::EventsDim1;
use items_2::eventsdim1::EventsDim1NoPulse;
use netpod::log::*;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsNano;
use netpod::APP_CBOR_FRAMED;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::ArrayValue;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::ScalarValue;
use serde::Deserialize;
use serieswriter::writer::SeriesWriter;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::io::Cursor;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use streams::framed_bytes::FramedBytesStream;
use taskrun::tokio::time::timeout;
// use core::io::BorrowedBuf;
#[allow(unused)]
macro_rules! debug_setup {
($($arg:tt)*) => {
if true {
info!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! trace_input {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! trace_queues {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
#[derive(Debug, ThisError)]
pub enum Error {
UnsupportedContentType,
Logic,
SeriesWriter(#[from] serieswriter::writer::Error),
MissingChannelName,
MissingScalarType,
MissingShape,
SendError,
Decode,
FramedBytes(#[from] streams::framed_bytes::Error),
InsertQueues(#[from] scywr::insertqueues::Error),
Serde(#[from] serde_json::Error),
#[error("Parse({0})")]
Parse(String),
NotSupported,
}
struct BodyRead {}
pub async fn post_v01(
(Query(params), body): (Query<HashMap<String, String>>, axum::body::Body),
(headers, Query(params), body): (HeaderMap, Query<HashMap<String, String>>, axum::body::Body),
rres: Arc<RoutesResources>,
) -> Json<serde_json::Value> {
match post_v01_try(params, body, rres).await {
match post_v01_try(headers, params, body, rres).await {
Ok(k) => k,
Err(e) => Json(serde_json::Value::String(e.to_string())),
Err(e) => Json(serde_json::json!({
"error": e.to_string(),
})),
}
}
async fn post_v01_try(
headers: HeaderMap,
params: HashMap<String, String>,
body: axum::body::Body,
rres: Arc<RoutesResources>,
) -> Result<Json<serde_json::Value>, Error> {
info!("params {:?}", params);
if let Some(ct) = headers.get("content-type") {
if let Ok(s) = ct.to_str() {
if s == APP_CBOR_FRAMED {
} else {
return Err(Error::UnsupportedContentType);
}
} else {
return Err(Error::UnsupportedContentType);
}
} else {
return Err(Error::UnsupportedContentType);
};
debug_setup!("params {:?}", params);
let stnow = SystemTime::now();
let worker_tx = rres.worker_tx.clone();
let backend = rres.backend.clone();
let channel = params.get("channelName").ok_or(Error::MissingChannelName)?.into();
let scalar_type = ScalarType::I16;
let shape = Shape::Scalar;
info!("establishing...");
let mut writer = SeriesWriter::establish(worker_tx, backend, channel, scalar_type, shape, stnow).await?;
let s = params.get("scalarType").ok_or(Error::MissingScalarType)?;
let scalar_type = ScalarType::from_variant_str(&s).map_err(|e| Error::Parse(e.to_string()))?;
let shape: Shape = serde_json::from_str(params.get("shape").map_or("[]", |x| x.as_str()))?;
debug_setup!("parsed scalar_type {scalar_type:?}");
debug_setup!("parsed shape {shape:?}");
debug_setup!(
"establishing series writer for {:?} {:?} {:?}",
channel,
scalar_type,
shape
);
let mut writer =
SeriesWriter::establish(worker_tx, backend, channel, scalar_type.clone(), shape.clone(), stnow).await?;
debug_setup!("series writer established");
let mut iqdqs = InsertDeques::new();
let mut iqtx = rres.iqtx.clone();
// iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?;
// let deque = &mut iqdqs.st_rf3_rx;
let mut frames = FramedBytesStream::new(body.into_data_stream().map_err(|_| streams::framed_bytes::Error::Logic));
while let Some(frame) = frames.try_next().await? {
info!("got frame len {}", frame.len());
let evs: EventsDim0<i16> = ciborium::de::from_reader(Cursor::new(frame)).map_err(|_| Error::Decode)?;
info!("see events {:?}", evs);
loop {
let x = timeout(Duration::from_millis(2000), frames.try_next()).await;
let x = match x {
Ok(x) => x,
Err(_) => {
tick_writers(&mut writer, &mut iqdqs)?;
continue;
}
};
let frame = match x? {
Some(x) => x,
None => {
trace!("input stream done");
break;
}
};
trace_input!("got frame len {}", frame.len());
let deque = &mut iqdqs.st_rf3_rx;
for (i, (&ts, &val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() {
info!("ev {:6} {:20} {:20}", i, ts, val);
let val = DataValue::Scalar(ScalarValue::I16(val));
writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts), val, deque)?;
match &shape {
Shape::Scalar => match &scalar_type {
ScalarType::U8 => {
evpush_dim0::<u8, _>(&frame, deque, &mut writer, |x| {
DataValue::Scalar(ScalarValue::U8(x as _))
})?;
}
ScalarType::U16 => {
evpush_dim0::<u16, _>(&frame, deque, &mut writer, |x| {
DataValue::Scalar(ScalarValue::U16(x as _))
})?;
}
ScalarType::U32 => {
evpush_dim0::<u32, _>(&frame, deque, &mut writer, |x| {
DataValue::Scalar(ScalarValue::U32(x as _))
})?;
}
ScalarType::U64 => {
evpush_dim0::<u64, _>(&frame, deque, &mut writer, |x| {
DataValue::Scalar(ScalarValue::U64(x as _))
})?;
}
ScalarType::I8 => {
evpush_dim0::<i8, _>(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::I8(x)))?;
}
ScalarType::I16 => {
evpush_dim0::<i16, _>(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::I16(x)))?;
}
ScalarType::I32 => {
evpush_dim0::<i32, _>(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::I32(x)))?;
}
ScalarType::I64 => {
evpush_dim0::<i64, _>(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::I64(x)))?;
}
ScalarType::F32 => {
evpush_dim0::<f32, _>(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::F32(x)))?;
}
ScalarType::F64 => {
evpush_dim0::<f64, _>(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::F64(x)))?;
}
ScalarType::BOOL => return Err(Error::NotSupported),
ScalarType::STRING => {
evpush_dim0::<String, _>(&frame, deque, &mut writer, |x| {
DataValue::Scalar(ScalarValue::String(x))
})?;
}
ScalarType::Enum => return Err(Error::NotSupported),
ScalarType::ChannelStatus => return Err(Error::NotSupported),
},
Shape::Wave(_) => match &scalar_type {
ScalarType::U8 => {
evpush_dim1::<u8, _>(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::U8(x)))?;
}
ScalarType::U16 => {
evpush_dim1::<u16, _>(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::U16(x)))?;
}
ScalarType::U32 => {
evpush_dim1::<u32, _>(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::U32(x)))?;
}
ScalarType::U64 => {
evpush_dim1::<u64, _>(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::U64(x)))?;
}
ScalarType::I8 => {
evpush_dim1::<i8, _>(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::I8(x)))?;
}
ScalarType::I16 => {
evpush_dim1::<i16, _>(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::I16(x)))?;
}
ScalarType::I32 => {
evpush_dim1::<i32, _>(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::I32(x)))?;
}
ScalarType::I64 => {
evpush_dim1::<i64, _>(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::I64(x)))?;
}
ScalarType::F32 => {
evpush_dim1::<f32, _>(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::F32(x)))?;
}
ScalarType::F64 => {
evpush_dim1::<f64, _>(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::F64(x)))?;
}
ScalarType::BOOL => return Err(Error::NotSupported),
ScalarType::STRING => return Err(Error::NotSupported),
ScalarType::Enum => return Err(Error::NotSupported),
ScalarType::ChannelStatus => return Err(Error::NotSupported),
},
Shape::Image(_, _) => return Err(Error::NotSupported),
}
iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?;
trace_queues!("frame send_all begin {} {}", iqdqs.summary(), iqtx.summary());
iqtx.send_all(&mut iqdqs).await?;
trace_queues!("frame send_all done {} {}", iqdqs.summary(), iqtx.summary());
tick_writers(&mut writer, &mut iqdqs)?;
trace_queues!("frame tick_writers done {} {}", iqdqs.summary(), iqtx.summary());
}
let deque = &mut iqdqs.st_rf3_rx;
finish_writers(vec![&mut writer], deque)?;
iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?;
trace_queues!("after send_all begin {} {}", iqdqs.summary(), iqtx.summary());
iqtx.send_all(&mut iqdqs).await?;
trace_queues!("after send_all done {} {}", iqdqs.summary(), iqtx.summary());
finish_writers(&mut writer, &mut iqdqs)?;
trace_queues!("after finish_writers done {} {}", iqdqs.summary(), iqtx.summary());
let ret = Json(serde_json::json!({
"result": true,
}));
let ret = Json(serde_json::json!({}));
Ok(ret)
}
fn tick_writers(sws: Vec<&mut SeriesWriter>, deque: &mut VecDeque<QueryItem>) -> Result<(), Error> {
for sw in sws {
sw.tick(deque)?;
fn evpush_dim0<T, F1>(
frame: &Bytes,
deque: &mut VecDeque<QueryItem>,
writer: &mut SeriesWriter,
f1: F1,
) -> Result<(), Error>
where
T: for<'a> Deserialize<'a> + fmt::Debug + Clone,
F1: Fn(T) -> DataValue,
{
let evs: EventsDim0NoPulse<T> = ciborium::de::from_reader(Cursor::new(frame))
.map_err(|e| {
error!("cbor decode error {e}");
})
.map_err(|_| Error::Decode)?;
let evs: EventsDim0<T> = evs.into();
trace_input!("see events {:?}", evs);
for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() {
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = f1(val);
writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts), val, deque)?;
}
Ok(())
}
fn finish_writers(sws: Vec<&mut SeriesWriter>, deque: &mut VecDeque<QueryItem>) -> Result<(), Error> {
for sw in sws {
sw.tick(deque)?;
fn evpush_dim1<T, F1>(
frame: &Bytes,
deque: &mut VecDeque<QueryItem>,
writer: &mut SeriesWriter,
f1: F1,
) -> Result<(), Error>
where
T: for<'a> Deserialize<'a> + fmt::Debug + Clone,
F1: Fn(Vec<T>) -> DataValue,
{
let evs: EventsDim1NoPulse<T> = ciborium::de::from_reader(Cursor::new(frame))
.map_err(|e| {
error!("cbor decode error {e}");
})
.map_err(|_| Error::Decode)?;
let evs: EventsDim1<T> = evs.into();
trace_input!("see events {:?}", evs);
for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() {
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = f1(val);
writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts), val, deque)?;
}
Ok(())
}
fn tick_writers(writer: &mut SeriesWriter, deque: &mut InsertDeques) -> Result<(), Error> {
writer.tick(&mut deque.st_rf3_rx)?;
Ok(())
}
fn finish_writers(writer: &mut SeriesWriter, deque: &mut InsertDeques) -> Result<(), Error> {
writer.tick(&mut deque.st_rf3_rx)?;
Ok(())
}

46
postingest.md Normal file
View File

@@ -0,0 +1,46 @@
# HTTP POST Ingest
Example:
```
Method: POST
Url: http://sf-ingest-mg-01.psi.ch:9009/daqingest/ingest/v1?channelName=MY:DEVICE:POS&shape=[]&scalarType=f32
Headers: Content-Type: application/cbor-framed
```
The body must be a stream of length delimited frames, where the payload of each frame is
a CBOR object.
The http body of the response then looks like this:
```txt
[CBOR-frame]
[CBOR-frame]
[CBOR-frame]
... etc
```
where each `[CBOR-frame]` looks like:
```txt
[length N of the following CBOR object: uint32 little-endian]
[reserved: 12 bytes of zero-padding]
[CBOR object: N bytes]
[padding: P zero-bytes, 0 <= P <= 7, such that (N + P) mod 8 = 0]
```
Each CBOR object must contain the timestamps (integer nanoseconds) and the values (depends on type), e.g:
```json
{
"tss": [1712100002000000000, 1712100003000000000, 1712100004000000000],
"values": [5.6, 7.8, 8.1]
}
```
## Shape of data
The `shape` URL parameter indicates whether the data is scalar or 1-dimensional,
for example `shape=[]` indicates a scalar and `shape=[4096]` indicates an array
with 4096 elements.
The shape nowadays only distinguishes between scalar and 1-dimensional, but the actual length of
the array dimension may vary from event to event and is therefore not meaningful.
Still, it doesn't hurt to pass the "typical" size of array data as parameter.

View File

@@ -21,11 +21,10 @@ to the most basic linux system libraries.
```yml
# Address to bind the HTTP API to, for runtime control and Prometheus metrics scrape:
api_bind: "0.0.0.0:3011"
# The hostname to send to channel access peers as our own hostname:
local_epics_hostname: sf-daqsync-02.psi.ch
api_bind: 0.0.0.0:3011
# The backend name to use for the channels handled by this daqingest instance:
backend: scylla
channels: directory-name-with-channel-config-files
# Addresses to use for channel access search:
search:
- "172.26.0.255"
@@ -35,19 +34,30 @@ search:
postgresql:
host: postgresql-host
port: 5432
user: database-username
user: the-username
pass: the-password
name: the-database-name
scylla:
name: the-database
scylla_st:
keyspace: backend_st
hosts:
- "sf-nube-11:19042"
- "sf-nube-12:19042"
- "sf-nube-13:19042"
- "sf-nube-14:19042"
keyspace: ks1
channels:
- "SOME-CHANNEL:1"
- "OTHER-CHANNEL:2"
- sf-nube-11:19042
- sf-nube-12:19042
- sf-nube-13:19042
- sf-nube-14:19042
scylla_mt:
keyspace: backend_mt
hosts:
- sf-nube-11:19042
- sf-nube-12:19042
- sf-nube-13:19042
- sf-nube-14:19042
scylla_lt:
keyspace: backend_lt
hosts:
- sf-nube-11:19042
- sf-nube-12:19042
- sf-nube-13:19042
- sf-nube-14:19042
```
@@ -61,3 +71,8 @@ as configured by the `api_bind` parameter.
```txt
http://<api_bind>/daqingest/channel/state?name=[...]
```
# HTTP POST ingest
It is possible to [ingest](postingest.md) data via the `api_bind` socket address.

View File

@@ -2,9 +2,11 @@ use crate::iteminsertqueue::QueryItem;
use crate::senderpolling::SenderPolling;
use async_channel::Receiver;
use async_channel::Sender;
use core::fmt;
use err::thiserror;
use err::ThisError;
use netpod::log::*;
use netpod::ttl::RetentionTime;
use pin_project::pin_project;
use std::collections::VecDeque;
use std::pin::Pin;
@@ -12,6 +14,8 @@ use std::pin::Pin;
#[derive(Debug, ThisError)]
pub enum Error {
QueuePush,
#[error("ChannelSend({0}, {1})")]
ChannelSend(RetentionTime, u8),
}
#[derive(Clone)]
@@ -24,22 +28,72 @@ pub struct InsertQueuesTx {
impl InsertQueuesTx {
/// Send all accumulated batches
pub async fn send_all(&mut self, iqdqs: &mut InsertDeques) -> Result<(), ()> {
pub async fn send_all(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
// Send each buffer down the corresponding channel
let item = core::mem::replace(&mut iqdqs.st_rf1_rx, VecDeque::new());
self.st_rf1_tx.send(item).await.map_err(|_| ())?;
let item = core::mem::replace(&mut iqdqs.st_rf3_rx, VecDeque::new());
self.st_rf3_tx.send(item).await.map_err(|_| ())?;
let item = core::mem::replace(&mut iqdqs.mt_rf3_rx, VecDeque::new());
self.mt_rf3_tx.send(item).await.map_err(|_| ())?;
let item = core::mem::replace(&mut iqdqs.lt_rf3_rx, VecDeque::new());
self.lt_rf3_tx.send(item).await.map_err(|_| ())?;
if false {
let item = core::mem::replace(&mut iqdqs.st_rf1_rx, VecDeque::new());
self.st_rf1_tx
.send(item)
.await
.map_err(|_| Error::ChannelSend(RetentionTime::Short, 1))?;
}
{
let item = core::mem::replace(&mut iqdqs.st_rf3_rx, VecDeque::new());
self.st_rf3_tx
.send(item)
.await
.map_err(|_| Error::ChannelSend(RetentionTime::Short, 3))?;
}
{
let item = core::mem::replace(&mut iqdqs.mt_rf3_rx, VecDeque::new());
self.mt_rf3_tx
.send(item)
.await
.map_err(|_| Error::ChannelSend(RetentionTime::Medium, 3))?;
}
{
let item = core::mem::replace(&mut iqdqs.lt_rf3_rx, VecDeque::new());
self.lt_rf3_tx
.send(item)
.await
.map_err(|_| Error::ChannelSend(RetentionTime::Long, 3))?;
}
Ok(())
}
pub fn clone2(&self) -> Self {
self.clone()
}
pub fn summary(&self) -> InsertQueuesTxSummary {
InsertQueuesTxSummary { obj: self }
}
}
pub struct InsertQueuesTxSummary<'a> {
obj: &'a InsertQueuesTx,
}
impl<'a> fmt::Display for InsertQueuesTxSummary<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let obj = self.obj;
write!(
fmt,
"InsertQueuesTx {{ st_rf1_tx: {} {} {}, st_rf3_tx: {} {} {}, mt_rf3_tx: {} {} {}, lt_rf3_tx: {} {} {} }}",
obj.st_rf1_tx.is_closed(),
obj.st_rf1_tx.is_full(),
obj.st_rf1_tx.len(),
obj.st_rf3_tx.is_closed(),
obj.st_rf3_tx.is_full(),
obj.st_rf3_tx.len(),
obj.mt_rf3_tx.is_closed(),
obj.mt_rf3_tx.is_full(),
obj.mt_rf3_tx.len(),
obj.lt_rf3_tx.is_closed(),
obj.lt_rf3_tx.is_full(),
obj.lt_rf3_tx.len(),
)
}
}
#[derive(Clone)]
@@ -72,7 +126,6 @@ impl InsertDeques {
self.st_rf1_rx.len() + self.st_rf3_rx.len() + self.mt_rf3_rx.len() + self.lt_rf3_rx.len()
}
///
pub fn clear(&mut self) {
self.st_rf1_rx.clear();
self.st_rf3_rx.clear();
@@ -80,14 +133,8 @@ impl InsertDeques {
self.lt_rf3_rx.clear();
}
pub fn log_summary(&self) {
let summ = InsertDequesSummary {
st_rf1_len: self.st_rf1_rx.len(),
st_rf3_len: self.st_rf3_rx.len(),
mt_rf3_len: self.mt_rf3_rx.len(),
lt_rf3_len: self.lt_rf3_rx.len(),
};
info!("{summ:?}");
pub fn summary(&self) -> InsertDequesSummary {
InsertDequesSummary { obj: self }
}
// Should be used only for connection and channel status items.
@@ -98,13 +145,22 @@ impl InsertDeques {
}
}
#[derive(Debug)]
#[allow(unused)]
struct InsertDequesSummary {
st_rf1_len: usize,
st_rf3_len: usize,
mt_rf3_len: usize,
lt_rf3_len: usize,
pub struct InsertDequesSummary<'a> {
obj: &'a InsertDeques,
}
impl<'a> fmt::Display for InsertDequesSummary<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let obj = self.obj;
write!(
fmt,
"InsertDeques {{ st_rf1_len: {}, st_rf3_len: {}, mt_rf3_len: {}, lt_rf3_len: {} }}",
obj.st_rf1_rx.len(),
obj.st_rf3_rx.len(),
obj.mt_rf3_rx.len(),
obj.lt_rf3_rx.len()
)
}
}
#[pin_project]
@@ -156,22 +212,29 @@ impl InsertSenderPolling {
unsafe { self.map_unchecked_mut(|x| &mut x.st_rf1_sp) }
}
pub fn log_summary(&self) {
let summ = InsertSenderPollingSummary {
st_rf1_idle: self.st_rf1_sp.is_idle(),
st_rf3_idle: self.st_rf3_sp.is_idle(),
mt_rf3_idle: self.mt_rf3_sp.is_idle(),
lt_rf3_idle: self.lt_rf3_sp.is_idle(),
};
info!("{summ:?}");
pub fn summary(&self) -> InsertSenderPollingSummary {
InsertSenderPollingSummary { obj: self }
}
}
#[derive(Debug)]
#[allow(unused)]
struct InsertSenderPollingSummary {
st_rf1_idle: bool,
st_rf3_idle: bool,
mt_rf3_idle: bool,
lt_rf3_idle: bool,
pub struct InsertSenderPollingSummary<'a> {
obj: &'a InsertSenderPolling,
}
impl<'a> fmt::Display for InsertSenderPollingSummary<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let obj = self.obj;
write!(
fmt,
"InsertSenderPolling {{ st_rf1_idle_len: {:?} {:?}, st_rf3_idle_len: {:?} {:?}, mt_rf3_idle_len: {:?} {:?}, lt_rf3_idle_len: {:?} {:?} }}",
obj.st_rf1_sp.is_idle(),
obj.st_rf1_sp.len(),
obj.st_rf3_sp.is_idle(),
obj.st_rf3_sp.len(),
obj.mt_rf3_sp.is_idle(),
obj.mt_rf3_sp.len(),
obj.lt_rf3_sp.is_idle(),
obj.lt_rf3_sp.len(),
)
}
}

View File

@@ -3,7 +3,6 @@ use crate::iteminsertqueue::insert_channel_status;
use crate::iteminsertqueue::insert_channel_status_fut;
use crate::iteminsertqueue::insert_connection_status;
use crate::iteminsertqueue::insert_connection_status_fut;
use crate::iteminsertqueue::insert_item;
use crate::iteminsertqueue::insert_item_fut;
use crate::iteminsertqueue::insert_msp_fut;
use crate::iteminsertqueue::Accounting;
@@ -35,7 +34,7 @@ use tokio::task::JoinHandle;
#[allow(unused)]
macro_rules! trace2 {
($($arg:tt)*) => {
if true {
if false {
trace!($($arg)*);
}
};
@@ -44,7 +43,7 @@ macro_rules! trace2 {
#[allow(unused)]
macro_rules! trace3 {
($($arg:tt)*) => {
if true {
if false {
trace!($($arg)*);
}
};
@@ -53,7 +52,16 @@ macro_rules! trace3 {
#[allow(unused)]
macro_rules! trace_item_execute {
($($arg:tt)*) => {
if true {
if false {
trace!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! debug_setup {
($($arg:tt)*) => {
if false {
debug!($($arg)*);
}
};
@@ -181,86 +189,6 @@ pub async fn spawn_scylla_insert_workers_dummy(
Ok(jhs)
}
#[allow(unused)]
async fn worker_unused(
worker_ix: usize,
item_inp: Receiver<QueryItem>,
insert_worker_opts: Arc<InsertWorkerOpts>,
data_store: Arc<DataStore>,
stats: Arc<InsertWorkerStats>,
) -> Result<(), Error> {
stats.worker_start().inc();
insert_worker_opts
.insert_workers_running
.fetch_add(1, atomic::Ordering::AcqRel);
let backoff_0 = Duration::from_millis(10);
let mut backoff = backoff_0.clone();
let mut i1 = 0;
loop {
let item = if let Ok(item) = item_inp.recv().await {
stats.item_recv.inc();
item
} else {
break;
};
match item {
QueryItem::ConnectionStatus(item) => match insert_connection_status(item, &data_store).await {
Ok(_) => {
stats.inserted_connection_status().inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &e);
back_off_sleep(&mut backoff).await;
}
},
QueryItem::ChannelStatus(item) => match insert_channel_status(item, &data_store).await {
Ok(_) => {
stats.inserted_channel_status().inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &e);
back_off_sleep(&mut backoff).await;
}
},
QueryItem::Insert(item) => {
let tsnow = TsMs::from_system_time(SystemTime::now());
let item_ts_net = item.ts_net.clone();
let dt = tsnow.to_u64().saturating_sub(item_ts_net.to_u64()) as u32;
stats.item_lat_net_worker().ingest(dt);
let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire);
let do_insert = i1 % 1000 < insert_frac;
match insert_item(item, &data_store, do_insert, &stats).await {
Ok(_) => {
stats.inserted_values().inc();
let tsnow = TsMs::from_system_time(SystemTime::now());
let dt = tsnow.to_u64().saturating_sub(item_ts_net.to_u64()) as u32;
stats.item_lat_net_store().ingest(dt);
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &e);
back_off_sleep(&mut backoff).await;
}
}
i1 += 1;
}
QueryItem::TimeBinSimpleF32(item) => {
info!("have time bin patch to insert: {item:?}");
return Err(Error::with_msg_no_trace("TODO insert item old path"));
}
QueryItem::Accounting(..) => {}
}
}
stats.worker_finish().inc();
insert_worker_opts
.insert_workers_running
.fetch_sub(1, atomic::Ordering::AcqRel);
trace2!("insert worker {worker_ix} done");
Ok(())
}
async fn worker_streamed(
worker_ix: usize,
concurrency: usize,
@@ -269,7 +197,7 @@ async fn worker_streamed(
data_store: Option<Arc<DataStore>>,
stats: Arc<InsertWorkerStats>,
) -> Result<(), Error> {
trace!("worker_streamed begin");
debug_setup!("worker_streamed begin");
stats.worker_start().inc();
insert_worker_opts
.insert_workers_running
@@ -290,7 +218,9 @@ async fn worker_streamed(
// })
.buffer_unordered(concurrency);
let mut stream = Box::pin(stream);
debug_setup!("waiting for item");
while let Some(item) = stream.next().await {
trace_item_execute!("see item");
match item {
Ok(_) => {
stats.inserted_values().inc();
@@ -321,7 +251,7 @@ async fn worker_streamed(
insert_worker_opts
.insert_workers_running
.fetch_sub(1, atomic::Ordering::AcqRel);
trace2!("insert worker {worker_ix} done");
debug_setup!("insert worker {worker_ix} done");
Ok(())
}
@@ -386,7 +316,7 @@ fn inspect_items(
trace_item_execute!("execute {worker_name} TimeBinSimpleF32");
}
QueryItem::Accounting(x) => {
if x.series.id() & 0x7f == 77 {
if x.series.id() & 0x7f == 200 {
debug!("execute {worker_name} Accounting {item:?}");
} else {
trace_item_execute!("execute {worker_name} Accounting {item:?}");

View File

@@ -48,6 +48,10 @@ pub enum Error {
#[derive(Clone, Debug, PartialEq)]
pub enum ScalarValue {
U8(u8),
U16(u16),
U32(u32),
U64(u64),
I8(i8),
I16(i16),
I32(i32),
@@ -62,6 +66,10 @@ pub enum ScalarValue {
impl ScalarValue {
pub fn byte_size(&self) -> u32 {
match self {
ScalarValue::U8(_) => 1,
ScalarValue::U16(_) => 1,
ScalarValue::U32(_) => 1,
ScalarValue::U64(_) => 1,
ScalarValue::I8(_) => 1,
ScalarValue::I16(_) => 2,
ScalarValue::I32(_) => 4,
@@ -76,6 +84,10 @@ impl ScalarValue {
pub fn string_short(&self) -> String {
match self {
ScalarValue::U8(x) => x.to_string(),
ScalarValue::U16(x) => x.to_string(),
ScalarValue::U32(x) => x.to_string(),
ScalarValue::U64(x) => x.to_string(),
ScalarValue::I8(x) => x.to_string(),
ScalarValue::I16(x) => x.to_string(),
ScalarValue::I32(x) => x.to_string(),
@@ -91,9 +103,14 @@ impl ScalarValue {
#[derive(Clone, Debug, PartialEq)]
pub enum ArrayValue {
U8(Vec<u8>),
U16(Vec<u16>),
U32(Vec<u32>),
U64(Vec<u64>),
I8(Vec<i8>),
I16(Vec<i16>),
I32(Vec<i32>),
I64(Vec<i64>),
F32(Vec<f32>),
F64(Vec<f64>),
Bool(Vec<bool>),
@@ -103,9 +120,14 @@ impl ArrayValue {
pub fn len(&self) -> usize {
use ArrayValue::*;
match self {
U8(a) => a.len(),
U16(a) => a.len(),
U32(a) => a.len(),
U64(a) => a.len(),
I8(a) => a.len(),
I16(a) => a.len(),
I32(a) => a.len(),
I64(a) => a.len(),
F32(a) => a.len(),
F64(a) => a.len(),
Bool(a) => a.len(),
@@ -115,9 +137,14 @@ impl ArrayValue {
pub fn byte_size(&self) -> u32 {
use ArrayValue::*;
match self {
U8(a) => 1 * a.len() as u32,
U16(a) => 2 * a.len() as u32,
U32(a) => 4 * a.len() as u32,
U64(a) => 8 * a.len() as u32,
I8(a) => 1 * a.len() as u32,
I16(a) => 2 * a.len() as u32,
I32(a) => 4 * a.len() as u32,
I64(a) => 8 * a.len() as u32,
F32(a) => 4 * a.len() as u32,
F64(a) => 8 * a.len() as u32,
Bool(a) => 1 * a.len() as u32,
@@ -127,6 +154,50 @@ impl ArrayValue {
pub fn to_binary_blob(&self) -> Vec<u8> {
use ArrayValue::*;
match self {
U8(a) => {
let n = self.byte_size();
let mut blob = Vec::with_capacity(32 + n as usize);
for _ in 0..4 {
blob.put_u64_le(0);
}
for &x in a {
blob.put_u8(x);
}
blob
}
U16(a) => {
let n = self.byte_size();
let mut blob = Vec::with_capacity(32 + n as usize);
for _ in 0..4 {
blob.put_u64_le(0);
}
for &x in a {
blob.put_u16_le(x);
}
blob
}
U32(a) => {
let n = self.byte_size();
let mut blob = Vec::with_capacity(32 + n as usize);
for _ in 0..4 {
blob.put_u64_le(0);
}
for &x in a {
blob.put_u32_le(x);
}
blob
}
U64(a) => {
let n = self.byte_size();
let mut blob = Vec::with_capacity(32 + n as usize);
for _ in 0..4 {
blob.put_u64_le(0);
}
for &x in a {
blob.put_u64_le(x);
}
blob
}
I8(a) => {
let n = self.byte_size();
let mut blob = Vec::with_capacity(32 + n as usize);
@@ -160,6 +231,17 @@ impl ArrayValue {
}
blob
}
I64(a) => {
let n = self.byte_size();
let mut blob = Vec::with_capacity(32 + n as usize);
for _ in 0..4 {
blob.put_u64_le(0);
}
for &x in a {
blob.put_i64_le(x);
}
blob
}
F32(a) => {
let n = self.byte_size();
let mut blob = Vec::with_capacity(32 + n as usize);
@@ -200,9 +282,14 @@ impl ArrayValue {
pub fn string_short(&self) -> String {
use ArrayValue::*;
match self {
U8(x) => format!("{}", x.get(0).map_or(0, |x| *x)),
U16(x) => format!("{}", x.get(0).map_or(0, |x| *x)),
U32(x) => format!("{}", x.get(0).map_or(0, |x| *x)),
U64(x) => format!("{}", x.get(0).map_or(0, |x| *x)),
I8(x) => format!("{}", x.get(0).map_or(0, |x| *x)),
I16(x) => format!("{}", x.get(0).map_or(0, |x| *x)),
I32(x) => format!("{}", x.get(0).map_or(0, |x| *x)),
I64(x) => format!("{}", x.get(0).map_or(0, |x| *x)),
F32(x) => format!("{}", x.get(0).map_or(0., |x| *x)),
F64(x) => format!("{}", x.get(0).map_or(0., |x| *x)),
Bool(x) => format!("{}", x.get(0).map_or(false, |x| *x)),
@@ -227,6 +314,10 @@ impl DataValue {
pub fn scalar_type(&self) -> ScalarType {
match self {
DataValue::Scalar(x) => match x {
ScalarValue::U8(_) => ScalarType::U8,
ScalarValue::U16(_) => ScalarType::U16,
ScalarValue::U32(_) => ScalarType::U32,
ScalarValue::U64(_) => ScalarType::U64,
ScalarValue::I8(_) => ScalarType::I8,
ScalarValue::I16(_) => ScalarType::I16,
ScalarValue::I32(_) => ScalarType::I32,
@@ -238,9 +329,14 @@ impl DataValue {
ScalarValue::Bool(_) => ScalarType::BOOL,
},
DataValue::Array(x) => match x {
ArrayValue::U8(_) => ScalarType::U8,
ArrayValue::U16(_) => ScalarType::U16,
ArrayValue::U32(_) => ScalarType::U32,
ArrayValue::U64(_) => ScalarType::U64,
ArrayValue::I8(_) => ScalarType::I8,
ArrayValue::I16(_) => ScalarType::I16,
ArrayValue::I32(_) => ScalarType::I32,
ArrayValue::I64(_) => ScalarType::I64,
ArrayValue::F32(_) => ScalarType::F32,
ArrayValue::F64(_) => ScalarType::F64,
ArrayValue::Bool(_) => ScalarType::BOOL,
@@ -647,143 +743,6 @@ impl Future for InsertFut {
}
}
async fn insert_scalar_gen<ST>(
par: InsParCom,
val: ST,
qu: &PreparedStatement,
data_store: &DataStore,
) -> Result<(), Error>
where
ST: Value + SerializeCql,
{
let params = (
par.series.to_i64(),
par.ts_msp.to_i64(),
par.ts_lsp.to_i64(),
par.ts_alt_1.ns() as i64,
par.pulse as i64,
val,
);
if par.do_insert {
let y = data_store.scy.execute(qu, params).await;
match y {
Ok(_) => Ok(()),
Err(e) => match e {
QueryError::TimeoutError => Err(Error::DbTimeout),
// TODO use `msg`
QueryError::DbError(e, _msg) => match e {
DbError::Overloaded => Err(Error::DbOverload),
_ => Err(e.into()),
},
_ => Err(e.into()),
},
}
} else {
Ok(())
}
}
async fn insert_array_gen<ST>(
par: InsParCom,
val: Vec<ST>,
qu: &PreparedStatement,
data_store: &DataStore,
) -> Result<(), Error>
where
ST: Value + SerializeCql,
{
if par.do_insert {
let params = (
par.series.to_i64(),
par.ts_msp.to_i64(),
par.ts_lsp.to_i64(),
par.ts_alt_1.ns() as i64,
par.pulse as i64,
val,
);
let y = data_store.scy.execute(qu, params).await;
match y {
Ok(_) => Ok(()),
Err(e) => match e {
QueryError::TimeoutError => Err(Error::DbTimeout),
// TODO use `msg`
QueryError::DbError(e, _msg) => match e {
DbError::Overloaded => Err(Error::DbOverload),
_ => Err(e.into()),
},
_ => Err(e.into()),
},
}
} else {
Ok(())
}
}
// TODO currently not in use, anything to merge?
pub async fn insert_item(
item: InsertItem,
data_store: &DataStore,
do_insert: bool,
stats: &Arc<InsertWorkerStats>,
) -> Result<(), Error> {
if item.msp_bump {
let params = (item.series.id() as i64, item.ts_msp.to_i64());
data_store.scy.execute(&data_store.qu_insert_ts_msp, params).await?;
stats.inserts_msp().inc();
}
use DataValue::*;
match item.val {
Scalar(val) => {
let par = InsParCom {
series: item.series,
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
ts_net: item.ts_net,
ts_alt_1: item.ts_alt_1,
pulse: item.pulse,
do_insert,
stats: stats.clone(),
};
use ScalarValue::*;
match val {
I8(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i8, &data_store).await?,
I16(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i16, &data_store).await?,
Enum(a, b) => insert_scalar_gen(par, a, &data_store.qu_insert_scalar_i16, &data_store).await?,
I32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i32, &data_store).await?,
I64(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i64, &data_store).await?,
F32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f32, &data_store).await?,
F64(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f64, &data_store).await?,
String(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_string, &data_store).await?,
Bool(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_bool, &data_store).await?,
}
}
Array(val) => {
let par = InsParCom {
series: item.series,
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
ts_net: item.ts_net,
ts_alt_1: item.ts_alt_1,
pulse: item.pulse,
do_insert,
stats: stats.clone(),
};
err::todo();
use ArrayValue::*;
match val {
I8(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i8, &data_store).await?,
I16(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i16, &data_store).await?,
I32(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i32, &data_store).await?,
F32(val) => insert_array_gen(par, val, &data_store.qu_insert_array_f32, &data_store).await?,
F64(val) => insert_array_gen(par, val, &data_store.qu_insert_array_f64, &data_store).await?,
Bool(val) => insert_array_gen(par, val, &data_store.qu_insert_array_bool, &data_store).await?,
}
}
}
stats.inserts_value().inc();
Ok(())
}
pub fn insert_msp_fut(
series: SeriesId,
ts_msp: TsMs,
@@ -819,6 +778,10 @@ pub fn insert_item_fut(
};
use ScalarValue::*;
match val {
U8(val) => insert_scalar_gen_fut(par, val as i8, data_store.qu_insert_scalar_u8.clone(), scy),
U16(val) => insert_scalar_gen_fut(par, val as i16, data_store.qu_insert_scalar_u16.clone(), scy),
U32(val) => insert_scalar_gen_fut(par, val as i32, data_store.qu_insert_scalar_u32.clone(), scy),
U64(val) => insert_scalar_gen_fut(par, val as i64, data_store.qu_insert_scalar_u64.clone(), scy),
I8(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i8.clone(), scy),
I16(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i16.clone(), scy),
I32(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i32.clone(), scy),
@@ -845,9 +808,14 @@ pub fn insert_item_fut(
let blob = val.to_binary_blob();
#[allow(unused)]
match val {
U8(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_u8.clone(), scy),
U16(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_u16.clone(), scy),
U32(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_u32.clone(), scy),
U64(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_u64.clone(), scy),
I8(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_i8.clone(), scy),
I16(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_i16.clone(), scy),
I32(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_i32.clone(), scy),
I64(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_i64.clone(), scy),
F32(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_f32.clone(), scy),
F64(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_f64.clone(), scy),
Bool(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_bool.clone(), scy),

View File

@@ -20,6 +20,10 @@ pub struct DataStore {
pub rett: RetentionTime,
pub scy: Arc<ScySession>,
pub qu_insert_ts_msp: Arc<PreparedStatement>,
pub qu_insert_scalar_u8: Arc<PreparedStatement>,
pub qu_insert_scalar_u16: Arc<PreparedStatement>,
pub qu_insert_scalar_u32: Arc<PreparedStatement>,
pub qu_insert_scalar_u64: Arc<PreparedStatement>,
pub qu_insert_scalar_i8: Arc<PreparedStatement>,
pub qu_insert_scalar_i16: Arc<PreparedStatement>,
pub qu_insert_scalar_i32: Arc<PreparedStatement>,
@@ -28,6 +32,10 @@ pub struct DataStore {
pub qu_insert_scalar_f64: Arc<PreparedStatement>,
pub qu_insert_scalar_bool: Arc<PreparedStatement>,
pub qu_insert_scalar_string: Arc<PreparedStatement>,
pub qu_insert_array_u8: Arc<PreparedStatement>,
pub qu_insert_array_u16: Arc<PreparedStatement>,
pub qu_insert_array_u32: Arc<PreparedStatement>,
pub qu_insert_array_u64: Arc<PreparedStatement>,
pub qu_insert_array_i8: Arc<PreparedStatement>,
pub qu_insert_array_i16: Arc<PreparedStatement>,
pub qu_insert_array_i32: Arc<PreparedStatement>,
@@ -100,6 +108,10 @@ impl DataStore {
.await?;
let qu_insert_ts_msp = Arc::new(q);
let qu_insert_scalar_u8 = prep_qu_ins_a!("events_scalar_u8", rett, scy);
let qu_insert_scalar_u16 = prep_qu_ins_a!("events_scalar_u16", rett, scy);
let qu_insert_scalar_u32 = prep_qu_ins_a!("events_scalar_u32", rett, scy);
let qu_insert_scalar_u64 = prep_qu_ins_a!("events_scalar_u64", rett, scy);
let qu_insert_scalar_i8 = prep_qu_ins_a!("events_scalar_i8", rett, scy);
let qu_insert_scalar_i16 = prep_qu_ins_a!("events_scalar_i16", rett, scy);
let qu_insert_scalar_i32 = prep_qu_ins_a!("events_scalar_i32", rett, scy);
@@ -109,7 +121,10 @@ impl DataStore {
let qu_insert_scalar_bool = prep_qu_ins_a!("events_scalar_bool", rett, scy);
let qu_insert_scalar_string = prep_qu_ins_a!("events_scalar_string", rett, scy);
// array
let qu_insert_array_u8 = prep_qu_ins_b!("events_array_u8", rett, scy);
let qu_insert_array_u16 = prep_qu_ins_b!("events_array_u16", rett, scy);
let qu_insert_array_u32 = prep_qu_ins_b!("events_array_u32", rett, scy);
let qu_insert_array_u64 = prep_qu_ins_b!("events_array_u64", rett, scy);
let qu_insert_array_i8 = prep_qu_ins_b!("events_array_i8", rett, scy);
let qu_insert_array_i16 = prep_qu_ins_b!("events_array_i16", rett, scy);
let qu_insert_array_i32 = prep_qu_ins_b!("events_array_i32", rett, scy);
@@ -172,6 +187,10 @@ impl DataStore {
rett,
scy,
qu_insert_ts_msp,
qu_insert_scalar_u8,
qu_insert_scalar_u16,
qu_insert_scalar_u32,
qu_insert_scalar_u64,
qu_insert_scalar_i8,
qu_insert_scalar_i16,
qu_insert_scalar_i32,
@@ -180,6 +199,10 @@ impl DataStore {
qu_insert_scalar_f64,
qu_insert_scalar_bool,
qu_insert_scalar_string,
qu_insert_array_u8,
qu_insert_array_u16,
qu_insert_array_u32,
qu_insert_array_u64,
qu_insert_array_i8,
qu_insert_array_i16,
qu_insert_array_i32,