Support more types

This commit is contained in:
Dominik Werder
2022-04-11 17:27:05 +02:00
parent a14c05e925
commit de26b5b7c4
5 changed files with 551 additions and 205 deletions

View File

@@ -25,6 +25,7 @@ md-5 = "0.9"
hex = "0.4"
libc = "0.2"
log = { path = "../log" }
stats = { path = "../stats" }
err = { path = "../../daqbuffer/err" }
netpod = { path = "../../daqbuffer/netpod" }
taskrun = { path = "../../daqbuffer/taskrun" }

View File

@@ -4,15 +4,18 @@ use err::Error;
use futures_core::Future;
use futures_util::FutureExt;
use log::*;
use netpod::timeunits::SEC;
use netpod::{ByteOrder, ScalarType, Shape};
use scylla::batch::{Batch, BatchType};
use scylla::frame::value::{BatchValues, ValueList};
use scylla::prepared_statement::PreparedStatement;
use scylla::transport::errors::QueryError;
use scylla::{BatchResult, QueryResult, Session as ScySession};
use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
use std::time::{Duration, Instant};
pub struct ScyQueryFut<V> {
#[allow(unused)]
@@ -134,6 +137,75 @@ impl<V> Future for ScyBatchFut<V> {
}
}
pub struct ScyBatchFutGen {
#[allow(unused)]
scy: Arc<ScySession>,
#[allow(unused)]
batch: Box<Batch>,
fut: Pin<Box<dyn Future<Output = Result<BatchResult, QueryError>>>>,
polled: usize,
ts_create: Instant,
ts_poll_start: Instant,
}
impl ScyBatchFutGen {
pub fn new<V>(scy: Arc<ScySession>, batch: Batch, values: V) -> Self
where
V: BatchValues + 'static,
{
let batch = Box::new(batch);
let scy_ref = unsafe { &*(&scy as &_ as *const _) } as &ScySession;
let batch_ref = unsafe { &*(&batch as &_ as *const _) } as &Batch;
let fut = scy_ref.batch(batch_ref, values);
let tsnow = Instant::now();
Self {
scy,
batch,
fut: Box::pin(fut),
polled: 0,
ts_create: tsnow,
ts_poll_start: tsnow,
}
}
}
impl Future for ScyBatchFutGen {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
if self.polled == 0 {
self.ts_poll_start = Instant::now();
}
self.polled += 1;
match self.fut.poll_unpin(cx) {
Ready(k) => match k {
Ok(_) => {
trace!("ScyBatchFutGen done Ok");
Ready(Ok(()))
}
Err(e) => {
let tsnow = Instant::now();
let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3;
let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3;
warn!(
"ScyBatchFutGen polled {} dt_created {:6.2} ms dt_polled {:6.2} ms",
self.polled, dt_created, dt_polled
);
warn!("ScyBatchFutGen done Err {e:?}");
Ready(Err(e).err_conv())
}
},
Pending => Pending,
}
}
}
pub struct ChannelWriteRes {
pub nrows: u32,
pub dt: Duration,
}
pub struct ChannelWriteFut {
nn: usize,
fut1: Option<Pin<Box<dyn Future<Output = Result<(), Error>>>>>,
@@ -143,7 +215,7 @@ pub struct ChannelWriteFut {
}
impl Future for ChannelWriteFut {
type Output = Result<(), Error>;
type Output = Result<ChannelWriteRes, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
@@ -180,10 +252,26 @@ impl Future for ChannelWriteFut {
} else {
if self.mask != 0 {
let ts2 = Instant::now();
let dt = ts2.duration_since(self.ts1.unwrap()).as_secs_f32() * 1e3;
info!("inserted nn {} dt {:6.2} ms", self.nn, dt);
let dt = ts2.duration_since(self.ts1.unwrap());
if false {
trace!(
"ChannelWriteFut inserted nn {} dt {:6.2} ms",
self.nn,
dt.as_secs_f32() * 1e3
);
}
let res = ChannelWriteRes {
nrows: self.nn as u32,
dt,
};
Ready(Ok(res))
} else {
let res = ChannelWriteRes {
nrows: 0,
dt: Duration::from_millis(0),
};
Ready(Ok(res))
}
Ready(Ok(()))
};
}
}
@@ -193,90 +281,318 @@ pub trait ChannelWriter {
fn write_msg(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result<ChannelWriteFut, Error>;
}
pub struct WriteFutF64 {
nn: usize,
fut1: Pin<Box<dyn Future<Output = Result<(), Error>>>>,
fut2: Pin<Box<dyn Future<Output = Result<(), Error>>>>,
trait MsgAcceptor {
fn len(&self) -> usize;
fn accept(&mut self, ts_msp: i64, ts_lsp: i64, pulse: i64, fr: &ZmtpFrame) -> Result<(), Error>;
fn should_flush(&self) -> bool;
fn flush_batch(&mut self, scy: Arc<ScySession>) -> Result<ScyBatchFutGen, Error>;
}
impl Future for WriteFutF64 {
type Output = Result<(), Error>;
macro_rules! impl_msg_acceptor_scalar {
($sname:ident, $st:ty, $qu_id:ident, $from_bytes:ident) => {
struct $sname {
query: PreparedStatement,
values: Vec<(i32, i64, i64, i64, $st)>,
series: i32,
}
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
todo!()
}
impl $sname {
#[allow(unused)]
pub fn new(series: i32, cq: &CommonQueries) -> Self {
Self {
query: cq.$qu_id.clone(),
values: vec![],
series,
}
}
}
impl MsgAcceptor for $sname {
fn len(&self) -> usize {
self.values.len()
}
fn accept(&mut self, ts_msp: i64, ts_lsp: i64, pulse: i64, fr: &ZmtpFrame) -> Result<(), Error> {
type ST = $st;
const STL: usize = std::mem::size_of::<ST>();
let value = ST::$from_bytes(fr.data()[0..STL].try_into()?);
self.values.push((self.series, ts_msp, ts_lsp, pulse, value));
Ok(())
}
fn should_flush(&self) -> bool {
self.len() >= 140 + ((self.series as usize) & 0x1f)
}
fn flush_batch(&mut self, scy: Arc<ScySession>) -> Result<ScyBatchFutGen, Error> {
let vt = mem::replace(&mut self.values, vec![]);
let nn = vt.len();
let mut batch = Batch::new(BatchType::Unlogged);
for _ in 0..nn {
batch.append_statement(self.query.clone());
}
let ret = ScyBatchFutGen::new(scy, batch, vt);
Ok(ret)
}
}
};
}
pub async fn run_write_fut(fut: ChannelWriteFut) -> Result<(), Error> {
err::todo();
let ts1 = Instant::now();
if let Some(f) = fut.fut1 {
f.await?;
}
if let Some(f) = fut.fut2 {
f.await?;
}
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1).as_secs_f32() * 1e3;
info!("insert f64 nn {} dt {:6.2} ms", fut.nn, dt);
Ok(())
macro_rules! impl_msg_acceptor_array {
($sname:ident, $st:ty, $qu_id:ident, $from_bytes:ident) => {
struct $sname {
query: PreparedStatement,
values: Vec<(i32, i64, i64, i64, Vec<$st>)>,
series: i32,
array_truncate: usize,
}
impl $sname {
#[allow(unused)]
pub fn new(series: i32, array_truncate: usize, cq: &CommonQueries) -> Self {
Self {
query: cq.$qu_id.clone(),
values: vec![],
series,
array_truncate,
}
}
}
impl MsgAcceptor for $sname {
fn len(&self) -> usize {
self.values.len()
}
fn accept(&mut self, ts_msp: i64, ts_lsp: i64, pulse: i64, fr: &ZmtpFrame) -> Result<(), Error> {
type ST = $st;
const STL: usize = std::mem::size_of::<ST>();
let vc = fr.data().len() / STL;
let mut values = Vec::with_capacity(vc);
for i in 0..vc {
let h = i * STL;
let value = ST::$from_bytes(fr.data()[h..h + STL].try_into()?);
values.push(value);
}
values.truncate(self.array_truncate);
self.values.push((self.series, ts_msp, ts_lsp, pulse, values));
Ok(())
}
fn should_flush(&self) -> bool {
self.len() >= 40 + ((self.series as usize) & 0x7)
}
fn flush_batch(&mut self, scy: Arc<ScySession>) -> Result<ScyBatchFutGen, Error> {
let vt = mem::replace(&mut self.values, vec![]);
let nn = vt.len();
let mut batch = Batch::new(BatchType::Unlogged);
for _ in 0..nn {
batch.append_statement(self.query.clone());
}
let ret = ScyBatchFutGen::new(scy, batch, vt);
Ok(ret)
}
}
};
}
pub async fn run_write_fut_f64(fut: WriteFutF64) -> Result<(), Error> {
err::todo();
let ts1 = Instant::now();
fut.fut1.await?;
fut.fut2.await?;
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1).as_secs_f32() * 1e3;
info!("insert f64 nn {} dt {:6.2} ms", fut.nn, dt);
Ok(())
}
impl_msg_acceptor_scalar!(MsgAcceptorScalarU16LE, i16, qu_insert_scalar_i16, from_le_bytes);
impl_msg_acceptor_scalar!(MsgAcceptorScalarU16BE, i16, qu_insert_scalar_i16, from_be_bytes);
impl_msg_acceptor_scalar!(MsgAcceptorScalarU32LE, i32, qu_insert_scalar_i32, from_le_bytes);
impl_msg_acceptor_scalar!(MsgAcceptorScalarU32BE, i32, qu_insert_scalar_i32, from_be_bytes);
impl_msg_acceptor_scalar!(MsgAcceptorScalarI16LE, i16, qu_insert_scalar_i16, from_le_bytes);
impl_msg_acceptor_scalar!(MsgAcceptorScalarI16BE, i16, qu_insert_scalar_i16, from_be_bytes);
impl_msg_acceptor_scalar!(MsgAcceptorScalarF32LE, f32, qu_insert_scalar_f32, from_le_bytes);
impl_msg_acceptor_scalar!(MsgAcceptorScalarF32BE, f32, qu_insert_scalar_f32, from_be_bytes);
impl_msg_acceptor_scalar!(MsgAcceptorScalarF64LE, f64, qu_insert_scalar_f64, from_le_bytes);
impl_msg_acceptor_scalar!(MsgAcceptorScalarF64BE, f64, qu_insert_scalar_f64, from_be_bytes);
pub struct ChannelWriterScalarF64 {
impl_msg_acceptor_array!(MsgAcceptorArrayU16LE, i16, qu_insert_array_u16, from_le_bytes);
impl_msg_acceptor_array!(MsgAcceptorArrayU16BE, i16, qu_insert_array_u16, from_be_bytes);
impl_msg_acceptor_array!(MsgAcceptorArrayI16LE, i16, qu_insert_array_i16, from_le_bytes);
impl_msg_acceptor_array!(MsgAcceptorArrayI16BE, i16, qu_insert_array_i16, from_be_bytes);
impl_msg_acceptor_array!(MsgAcceptorArrayF32LE, f32, qu_insert_array_f32, from_le_bytes);
impl_msg_acceptor_array!(MsgAcceptorArrayF32BE, f32, qu_insert_array_f32, from_be_bytes);
impl_msg_acceptor_array!(MsgAcceptorArrayF64LE, f64, qu_insert_array_f64, from_le_bytes);
impl_msg_acceptor_array!(MsgAcceptorArrayF64BE, f64, qu_insert_array_f64, from_be_bytes);
pub struct ChannelWriterAll {
series: u32,
scy: Arc<ScySession>,
common_queries: Arc<CommonQueries>,
ts_msp_lsp: fn(u64, u32) -> (u64, u64),
ts_msp_last: u64,
tmp_vals: Vec<(i32, i64, i64, i64, f64)>,
acceptor: Box<dyn MsgAcceptor>,
dtype_mark: u32,
}
impl ChannelWriterScalarF64 {
pub fn new(series: u32, common_queries: Arc<CommonQueries>, scy: Arc<ScySession>) -> Self {
Self {
impl ChannelWriterAll {
pub fn new(
series: u32,
common_queries: Arc<CommonQueries>,
scy: Arc<ScySession>,
scalar_type: ScalarType,
shape: Shape,
byte_order: ByteOrder,
array_truncate: usize,
) -> Result<Self, Error> {
let dtype_mark = scalar_type.index() as u32;
let dtype_mark = match &shape {
Shape::Scalar => dtype_mark,
Shape::Wave(_) => 1000 + dtype_mark,
Shape::Image(_, _) => 2000 + dtype_mark,
};
let (ts_msp_lsp, acc): (fn(u64, u32) -> (u64, u64), Box<dyn MsgAcceptor>) = match &shape {
Shape::Scalar => match &scalar_type {
ScalarType::U16 => match &byte_order {
ByteOrder::BE => {
let acc = MsgAcceptorScalarU16BE::new(series as i32, &common_queries);
(ts_msp_lsp_1, Box::new(acc) as _)
}
ByteOrder::LE => {
return Err(Error::with_msg_no_trace(format!(
"TODO {:?} {:?} {:?}",
scalar_type, shape, byte_order
)));
}
},
ScalarType::U32 => match &byte_order {
ByteOrder::BE => {
let acc = MsgAcceptorScalarU32BE::new(series as i32, &common_queries);
(ts_msp_lsp_1, Box::new(acc) as _)
}
ByteOrder::LE => {
return Err(Error::with_msg_no_trace(format!(
"TODO {:?} {:?} {:?}",
scalar_type, shape, byte_order
)));
}
},
ScalarType::F32 => match &byte_order {
ByteOrder::BE => {
let acc = MsgAcceptorScalarF32BE::new(series as i32, &common_queries);
(ts_msp_lsp_1, Box::new(acc) as _)
}
ByteOrder::LE => {
return Err(Error::with_msg_no_trace(format!(
"TODO {:?} {:?} {:?}",
scalar_type, shape, byte_order
)));
}
},
ScalarType::F64 => match &byte_order {
ByteOrder::BE => {
let acc = MsgAcceptorScalarF64BE::new(series as i32, &common_queries);
(ts_msp_lsp_1, Box::new(acc) as _)
}
ByteOrder::LE => {
return Err(Error::with_msg_no_trace(format!(
"TODO {:?} {:?} {:?}",
scalar_type, shape, byte_order
)));
}
},
_ => {
return Err(Error::with_msg_no_trace(format!(
"TODO {:?} {:?} {:?}",
scalar_type, shape, byte_order
)));
}
},
Shape::Wave(nele) => {
info!("set up wave acceptor nele {nele}");
match &scalar_type {
ScalarType::U16 => match &byte_order {
ByteOrder::LE => {
let acc = MsgAcceptorArrayU16LE::new(series as i32, array_truncate, &common_queries);
(ts_msp_lsp_2, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorArrayU16BE::new(series as i32, array_truncate, &common_queries);
(ts_msp_lsp_2, Box::new(acc) as _)
}
},
ScalarType::I16 => match &byte_order {
ByteOrder::LE => {
let acc = MsgAcceptorArrayI16LE::new(series as i32, array_truncate, &common_queries);
(ts_msp_lsp_2, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorArrayI16BE::new(series as i32, array_truncate, &common_queries);
(ts_msp_lsp_2, Box::new(acc) as _)
}
},
ScalarType::F32 => match &byte_order {
ByteOrder::LE => {
let acc = MsgAcceptorArrayF32LE::new(series as i32, array_truncate, &common_queries);
(ts_msp_lsp_2, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorArrayF32BE::new(series as i32, array_truncate, &common_queries);
(ts_msp_lsp_2, Box::new(acc) as _)
}
},
ScalarType::F64 => match &byte_order {
ByteOrder::LE => {
let acc = MsgAcceptorArrayF64LE::new(series as i32, array_truncate, &common_queries);
(ts_msp_lsp_2, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorArrayF64BE::new(series as i32, array_truncate, &common_queries);
(ts_msp_lsp_2, Box::new(acc) as _)
}
},
_ => {
return Err(Error::with_msg_no_trace(format!(
"TODO {:?} {:?} {:?}",
scalar_type, shape, byte_order
)));
}
}
}
_ => {
return Err(Error::with_msg_no_trace(format!(
"TODO {:?} {:?} {:?}",
scalar_type, shape, byte_order
)));
}
};
let ret = Self {
series,
scy,
ts_msp_last: 0,
common_queries,
tmp_vals: vec![],
}
ts_msp_lsp,
ts_msp_last: 0,
acceptor: acc,
dtype_mark,
};
Ok(ret)
}
pub fn dtype_mark(&self) -> u32 {
self.dtype_mark
}
pub fn write_msg_impl(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result<ChannelWriteFut, Error> {
let (ts_msp, ts_lsp) = ts_msp_lsp_1(ts);
let (ts_msp, ts_lsp) = (self.ts_msp_lsp)(ts, self.series);
let fut1 = if ts_msp != self.ts_msp_last {
info!("write_msg_impl TS MSP CHANGED ts {} pulse {}", ts, pulse);
info!("ts_msp changed ts {ts} pulse {pulse} ts_msp {ts_msp} ts_lsp {ts_lsp}");
self.ts_msp_last = ts_msp;
// TODO make the passing of the query parameters type safe.
let fut = ScyQueryFut::new(
self.scy.clone(),
self.common_queries.qu_insert_ts_msp.clone(),
(self.series as i32, ts_msp as i64),
(self.series as i32, ts_msp as i64, self.dtype_mark as i32),
);
Some(Box::pin(fut) as _)
} else {
None
};
let value = f64::from_be_bytes(fr.data().try_into()?);
self.tmp_vals
.push((self.series as i32, ts_msp as i64, ts_lsp as i64, pulse as i64, value));
if self.tmp_vals.len() >= 100 + ((self.series as usize) & 0xf) {
let vt = std::mem::replace(&mut self.tmp_vals, vec![]);
let nn = vt.len();
let mut batch = Batch::new(BatchType::Unlogged);
for _ in 0..nn {
batch.append_statement(self.common_queries.qu_insert_scalar_f64.clone());
}
let fut = ScyBatchFut::new(self.scy.clone(), batch, vt);
self.acceptor.accept(ts_msp as i64, ts_lsp as i64, pulse as i64, fr)?;
if self.acceptor.should_flush() {
let nn = self.acceptor.len();
let fut = self.acceptor.flush_batch(self.scy.clone())?;
let fut2 = Some(Box::pin(fut) as _);
let ret = ChannelWriteFut {
ts1: None,
@@ -299,105 +615,28 @@ impl ChannelWriterScalarF64 {
}
}
impl ChannelWriter for ChannelWriterScalarF64 {
impl ChannelWriter for ChannelWriterAll {
fn write_msg(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result<ChannelWriteFut, Error> {
self.write_msg_impl(ts, pulse, fr)
}
}
pub struct ChannelWriterArrayF64 {
series: u32,
scy: Arc<ScySession>,
common_queries: Arc<CommonQueries>,
ts_msp_last: u64,
tmp_vals: Vec<(i32, i64, i64, i64, Vec<f64>)>,
truncate: usize,
fn ts_msp_lsp_1(ts: u64, series: u32) -> (u64, u64) {
ts_msp_lsp_gen(ts, series, 100 * SEC)
}
impl ChannelWriterArrayF64 {
pub fn new(series: u32, common_queries: Arc<CommonQueries>, scy: Arc<ScySession>, truncate: usize) -> Self {
Self {
series,
scy,
ts_msp_last: 0,
common_queries,
tmp_vals: vec![],
truncate,
}
}
pub fn write_msg_impl(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result<ChannelWriteFut, Error> {
let (ts_msp, ts_lsp) = ts_msp_lsp_2(ts);
let fut1 = if ts_msp != self.ts_msp_last {
info!("write_msg_impl TS MSP CHANGED ts {} pulse {}", ts, pulse);
self.ts_msp_last = ts_msp;
let fut = ScyQueryFut::new(
self.scy.clone(),
self.common_queries.qu_insert_ts_msp.clone(),
(self.series as i32, ts_msp as i64),
);
Some(Box::pin(fut) as _)
} else {
None
};
type ST = f64;
const STL: usize = std::mem::size_of::<ST>();
let vc = fr.data().len() / STL;
let mut values = Vec::with_capacity(vc);
for i in 0..vc {
let h = i * STL;
let value = f64::from_be_bytes(fr.data()[h..h + STL].try_into()?);
values.push(value);
}
values.truncate(self.truncate);
self.tmp_vals
.push((self.series as i32, ts_msp as i64, ts_lsp as i64, pulse as i64, values));
if self.tmp_vals.len() >= 40 + ((self.series as usize) & 0x7) {
let vt = std::mem::replace(&mut self.tmp_vals, vec![]);
let nn = vt.len();
let mut batch = Batch::new(BatchType::Unlogged);
for _ in 0..nn {
batch.append_statement(self.common_queries.qu_insert_array_f64.clone());
}
let fut = ScyBatchFut::new(self.scy.clone(), batch, vt);
let fut2 = Some(Box::pin(fut) as _);
let ret = ChannelWriteFut {
ts1: None,
mask: 0,
nn,
fut1,
fut2,
};
Ok(ret)
} else {
let ret = ChannelWriteFut {
ts1: None,
mask: 0,
nn: 0,
fut1: fut1,
fut2: None,
};
Ok(ret)
}
}
fn ts_msp_lsp_2(ts: u64, series: u32) -> (u64, u64) {
ts_msp_lsp_gen(ts, series, 10 * SEC)
}
impl ChannelWriter for ChannelWriterArrayF64 {
fn write_msg(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result<ChannelWriteFut, Error> {
self.write_msg_impl(ts, pulse, fr)
fn ts_msp_lsp_gen(ts: u64, series: u32, fak: u64) -> (u64, u64) {
if ts < u32::MAX as u64 {
return (0, 0);
}
}
fn ts_msp_lsp_1(ts: u64) -> (u64, u64) {
const MASK: u64 = u64::MAX >> 23;
let ts_msp = ts & (!MASK);
let ts_lsp = ts & MASK;
(ts_msp, ts_lsp)
}
fn ts_msp_lsp_2(ts: u64) -> (u64, u64) {
const MASK: u64 = u64::MAX >> 16;
let ts_msp = ts & (!MASK);
let ts_lsp = ts & MASK;
let off = series as u64;
let ts_a = ts - off;
let ts_b = ts_a / fak;
let ts_lsp = ts_a % fak;
let ts_msp = ts_b * fak + off;
(ts_msp, ts_lsp)
}

View File

@@ -1,6 +1,6 @@
use crate::bsread::{parse_zmtp_message, BsreadMessage};
use crate::bsread::{ChannelDesc, GlobalTimestamp, HeadA, HeadB};
use crate::channelwriter::{ChannelWriter, ChannelWriterArrayF64, ChannelWriterScalarF64};
use crate::channelwriter::{ChannelWriter, ChannelWriterAll};
use crate::netbuf::NetBuf;
use async_channel::{Receiver, Sender};
#[allow(unused)]
@@ -10,11 +10,13 @@ use futures_core::{Future, Stream};
use futures_util::{pin_mut, FutureExt, StreamExt};
use log::*;
use netpod::timeunits::*;
use netpod::{ByteOrder, ScalarType, Shape};
use scylla::batch::{Batch, BatchType, Consistency};
use scylla::prepared_statement::PreparedStatement;
use scylla::transport::errors::QueryError;
use scylla::{Session as ScySession, SessionBuilder};
use serde_json::Value as JsVal;
use stats::CheckEvery;
use std::collections::BTreeMap;
use std::ffi::CStr;
use std::fmt;
@@ -82,7 +84,15 @@ pub struct CommonQueries {
pub qu1: PreparedStatement,
pub qu2: PreparedStatement,
pub qu_insert_ts_msp: PreparedStatement,
pub qu_insert_scalar_u16: PreparedStatement,
pub qu_insert_scalar_u32: PreparedStatement,
pub qu_insert_scalar_i16: PreparedStatement,
pub qu_insert_scalar_i32: PreparedStatement,
pub qu_insert_scalar_f32: PreparedStatement,
pub qu_insert_scalar_f64: PreparedStatement,
pub qu_insert_array_u16: PreparedStatement,
pub qu_insert_array_i16: PreparedStatement,
pub qu_insert_array_f32: PreparedStatement,
pub qu_insert_array_f64: PreparedStatement,
}
@@ -128,6 +138,7 @@ struct BsreadClient {
scy: Arc<ScySession>,
channel_writers: BTreeMap<u32, Box<dyn ChannelWriter>>,
common_queries: Arc<CommonQueries>,
print_stats: CheckEvery,
}
impl BsreadClient {
@@ -146,6 +157,7 @@ impl BsreadClient {
scy,
channel_writers: Default::default(),
common_queries,
print_stats: CheckEvery::new(Duration::from_millis(2000)),
};
Ok(ret)
}
@@ -162,8 +174,9 @@ impl BsreadClient {
let mut frame_diff_count = 0u64;
let mut hash_mismatch_count = 0u64;
let mut head_b = HeadB::empty();
let mut status_last = Instant::now();
let mut bytes_payload = 0u64;
let mut rows_inserted = 0u32;
let mut time_spent_inserting = Duration::from_millis(0);
while let Some(item) = zmtp.next().await {
match item {
Ok(ev) => match ev {
@@ -200,7 +213,12 @@ impl BsreadClient {
dh_md5_last = bm.head_b_md5.clone();
for chn in &head_b.channels {
info!("Setup writer for {}", chn.name);
self.setup_channel_writers(chn)?;
match self.setup_channel_writers(chn).await {
Ok(_) => {}
Err(e) => {
warn!("can not set up writer for {} {e:?}", chn.name);
}
}
}
} else {
error!("changed data header hash {} mh {}", bm.head_b_md5, bm.head_a.hash);
@@ -241,11 +259,13 @@ impl BsreadClient {
let series = get_series_id(chn);
if !self.channel_writers.contains_key(&series) {}
if let Some(cw) = self.channel_writers.get_mut(&series) {
cw.write_msg(ts, pulse, fr)?.await?;
let res = cw.write_msg(ts, pulse, fr)?.await?;
rows_inserted += res.nrows;
time_spent_inserting = time_spent_inserting + res.dt;
bytes_payload += fr.data().len() as u64;
} else {
// TODO check for missing writers.
//warn!("no writer for {}", chn.name);
warn!("no writer for {}", chn.name);
}
}
}
@@ -272,19 +292,21 @@ impl BsreadClient {
if false && msgc > 10000 {
break;
}
let tsnow = Instant::now();
let dt = tsnow.duration_since(status_last);
if dt >= Duration::from_millis(2000) {
let r = bytes_payload as f32 / dt.as_secs_f32() * 1e-3;
info!("rate: {r:8.3} kB/s");
status_last = tsnow;
let dt = self.print_stats.is_elapsed_now();
if dt > 0. {
let nrs = rows_inserted as f32 / dt;
let dt_ins = time_spent_inserting.as_secs_f32() * 1e3;
let r = bytes_payload as f32 / dt * 1e-3;
info!("insert {nrs:.0} 1/s dt-ins {dt_ins:4.0} ms payload {r:8.3} kB/s");
rows_inserted = 0;
time_spent_inserting = Duration::from_millis(0);
bytes_payload = 0;
}
}
Ok(())
}
fn setup_channel_writers(&mut self, chn: &ChannelDesc) -> Result<(), Error> {
async fn setup_channel_writers(&mut self, chn: &ChannelDesc) -> Result<(), Error> {
let series = get_series_id(chn);
let has_comp = match &chn.compression {
Some(s) => s != "none",
@@ -294,48 +316,29 @@ impl BsreadClient {
warn!("Compression not yet supported [{}]", chn.name);
return Ok(());
}
match chn.ty.as_str() {
"float64" => match &chn.shape {
JsVal::Array(a) => {
if a.len() == 1 {
if let Some(n) = a[0].as_u64() {
if n == 1 {
if chn.encoding == "big" {
let cw = ChannelWriterScalarF64::new(
series,
self.common_queries.clone(),
self.scy.clone(),
);
self.channel_writers.insert(series, Box::new(cw));
} else {
warn!("TODO scalar f64 LE");
}
} else {
if chn.encoding == "big" {
let cw = ChannelWriterArrayF64::new(
series,
self.common_queries.clone(),
self.scy.clone(),
self.opts.array_truncate.unwrap_or(64),
);
self.channel_writers.insert(series, Box::new(cw));
} else {
warn!("TODO array f64 LE");
}
}
}
} else {
warn!("TODO writer f64 shape {:?}", a);
}
}
s => {
warn!("TODO writer f64 shape {:?}", s);
}
},
k => {
warn!("TODO writer dtype {:?}", k);
}
}
let scalar_type = ScalarType::from_bsread_str(&chn.ty)?;
let shape = Shape::from_bsread_jsval(&chn.shape)?;
let byte_order = ByteOrder::from_bsread_str(&chn.encoding)?;
let trunc = self.opts.array_truncate.unwrap_or(64);
let cw = ChannelWriterAll::new(
series,
self.common_queries.clone(),
self.scy.clone(),
scalar_type.clone(),
shape.clone(),
byte_order.clone(),
trunc,
)?;
let dtype_mark = cw.dtype_mark();
self.channel_writers.insert(series, Box::new(cw));
// TODO insert correct facility name
self.scy
.query(
"insert into series_by_channel (facility, channel_name, dtype, series) values (?, ?, ?, ?)",
("scylla", &chn.name, dtype_mark as i32, series as i32),
)
.await
.err_conv()?;
Ok(())
}
@@ -404,13 +407,45 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> {
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_ts_msp = scy
.prepare("insert into ts_msp (series, ts_msp) values (?, ?)")
.prepare("insert into ts_msp (series, ts_msp, dtype) values (?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_u16 = scy
.prepare("insert into events_scalar_u16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_u32 = scy
.prepare("insert into events_scalar_u32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_i16 = scy
.prepare("insert into events_scalar_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_i32 = scy
.prepare("insert into events_scalar_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_f32 = scy
.prepare("insert into events_scalar_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_f64 = scy
.prepare("insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_array_u16 = scy
.prepare("insert into events_array_u16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_array_i16 = scy
.prepare("insert into events_array_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_array_f32 = scy
.prepare("insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_array_f64 = scy
.prepare("insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
@@ -419,7 +454,15 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> {
qu1,
qu2,
qu_insert_ts_msp,
qu_insert_scalar_u16,
qu_insert_scalar_u32,
qu_insert_scalar_i16,
qu_insert_scalar_i32,
qu_insert_scalar_f32,
qu_insert_scalar_f64,
qu_insert_array_u16,
qu_insert_array_i16,
qu_insert_array_f32,
qu_insert_array_f64,
};
let common_queries = Arc::new(common_queries);
@@ -427,7 +470,6 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> {
for source_addr in &opts.sources {
let client = BsreadClient::new(opts.clone(), source_addr.into(), scy.clone(), common_queries.clone()).await?;
let fut = ClientRun::new(client);
//let client = Box::pin(client) as Pin<Box<dyn Future<Output = Result<(), Error>>>>;
clients.push(fut);
}
futures_util::future::join_all(clients).await;

12
stats/Cargo.toml Normal file
View File

@@ -0,0 +1,12 @@
[package]
name = "stats"
version = "0.0.1-a.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[lib]
path = "src/stats.rs"
[dependencies]
log = { path = "../log" }
err = { path = "../../daqbuffer/err" }

52
stats/src/stats.rs Normal file
View File

@@ -0,0 +1,52 @@
use std::time::{Duration, Instant};
pub struct EMA {
ema: f32,
emv: f32,
k: f32,
}
impl EMA {
pub fn default() -> Self {
Self {
ema: 0.0,
emv: 0.0,
k: 0.05,
}
}
pub fn update<V>(&mut self, v: V)
where
V: Into<f32>,
{
let k = self.k;
let dv = v.into() - self.ema;
self.ema += k * dv;
self.emv = (1f32 - k) * (self.emv + k * dv * dv);
}
}
pub struct CheckEvery {
ts_last: Instant,
dt: Duration,
}
impl CheckEvery {
pub fn new(dt: Duration) -> Self {
Self {
ts_last: Instant::now(),
dt,
}
}
pub fn is_elapsed_now(&mut self) -> f32 {
let now = Instant::now();
let dt = now.duration_since(self.ts_last);
if dt >= self.dt {
self.ts_last = now;
dt.as_secs_f32()
} else {
-16f32
}
}
}