From de26b5b7c427d32f2da8272a99fe63de898ca6b2 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 11 Apr 2022 17:27:05 +0200 Subject: [PATCH] Support more types --- netfetch/Cargo.toml | 1 + netfetch/src/channelwriter.rs | 537 ++++++++++++++++++++++++---------- netfetch/src/zmtp.rs | 154 ++++++---- stats/Cargo.toml | 12 + stats/src/stats.rs | 52 ++++ 5 files changed, 551 insertions(+), 205 deletions(-) create mode 100644 stats/Cargo.toml create mode 100644 stats/src/stats.rs diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index b71151e..7c7e2e5 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -25,6 +25,7 @@ md-5 = "0.9" hex = "0.4" libc = "0.2" log = { path = "../log" } +stats = { path = "../stats" } err = { path = "../../daqbuffer/err" } netpod = { path = "../../daqbuffer/netpod" } taskrun = { path = "../../daqbuffer/taskrun" } diff --git a/netfetch/src/channelwriter.rs b/netfetch/src/channelwriter.rs index 7096378..15b9d95 100644 --- a/netfetch/src/channelwriter.rs +++ b/netfetch/src/channelwriter.rs @@ -4,15 +4,18 @@ use err::Error; use futures_core::Future; use futures_util::FutureExt; use log::*; +use netpod::timeunits::SEC; +use netpod::{ByteOrder, ScalarType, Shape}; use scylla::batch::{Batch, BatchType}; use scylla::frame::value::{BatchValues, ValueList}; use scylla::prepared_statement::PreparedStatement; use scylla::transport::errors::QueryError; use scylla::{BatchResult, QueryResult, Session as ScySession}; +use std::mem; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use std::time::Instant; +use std::time::{Duration, Instant}; pub struct ScyQueryFut { #[allow(unused)] @@ -134,6 +137,75 @@ impl Future for ScyBatchFut { } } +pub struct ScyBatchFutGen { + #[allow(unused)] + scy: Arc, + #[allow(unused)] + batch: Box, + fut: Pin>>>, + polled: usize, + ts_create: Instant, + ts_poll_start: Instant, +} + +impl ScyBatchFutGen { + pub fn new(scy: Arc, batch: Batch, values: V) -> Self + where + V: BatchValues + 'static, + { + let batch = Box::new(batch); + let scy_ref = unsafe { &*(&scy as &_ as *const _) } as &ScySession; + let batch_ref = unsafe { &*(&batch as &_ as *const _) } as &Batch; + let fut = scy_ref.batch(batch_ref, values); + let tsnow = Instant::now(); + Self { + scy, + batch, + fut: Box::pin(fut), + polled: 0, + ts_create: tsnow, + ts_poll_start: tsnow, + } + } +} + +impl Future for ScyBatchFutGen { + type Output = Result<(), Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + use Poll::*; + if self.polled == 0 { + self.ts_poll_start = Instant::now(); + } + self.polled += 1; + match self.fut.poll_unpin(cx) { + Ready(k) => match k { + Ok(_) => { + trace!("ScyBatchFutGen done Ok"); + Ready(Ok(())) + } + Err(e) => { + let tsnow = Instant::now(); + let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3; + let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3; + warn!( + "ScyBatchFutGen polled {} dt_created {:6.2} ms dt_polled {:6.2} ms", + self.polled, dt_created, dt_polled + ); + warn!("ScyBatchFutGen done Err {e:?}"); + Ready(Err(e).err_conv()) + } + }, + Pending => Pending, + } + } +} + +pub struct ChannelWriteRes { + pub nrows: u32, + pub dt: Duration, +} + pub struct ChannelWriteFut { nn: usize, fut1: Option>>>>, @@ -143,7 +215,7 @@ pub struct ChannelWriteFut { } impl Future for ChannelWriteFut { - type Output = Result<(), Error>; + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { use Poll::*; @@ -180,10 +252,26 @@ impl Future for ChannelWriteFut { } else { if self.mask != 0 { let ts2 = Instant::now(); - let dt = ts2.duration_since(self.ts1.unwrap()).as_secs_f32() * 1e3; - info!("inserted nn {} dt {:6.2} ms", self.nn, dt); + let dt = ts2.duration_since(self.ts1.unwrap()); + if false { + trace!( + "ChannelWriteFut inserted nn {} dt {:6.2} ms", + self.nn, + dt.as_secs_f32() * 1e3 + ); + } + let res = ChannelWriteRes { + nrows: self.nn as u32, + dt, + }; + Ready(Ok(res)) + } else { + let res = ChannelWriteRes { + nrows: 0, + dt: Duration::from_millis(0), + }; + Ready(Ok(res)) } - Ready(Ok(())) }; } } @@ -193,90 +281,318 @@ pub trait ChannelWriter { fn write_msg(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result; } -pub struct WriteFutF64 { - nn: usize, - fut1: Pin>>>, - fut2: Pin>>>, +trait MsgAcceptor { + fn len(&self) -> usize; + fn accept(&mut self, ts_msp: i64, ts_lsp: i64, pulse: i64, fr: &ZmtpFrame) -> Result<(), Error>; + fn should_flush(&self) -> bool; + fn flush_batch(&mut self, scy: Arc) -> Result; } -impl Future for WriteFutF64 { - type Output = Result<(), Error>; +macro_rules! impl_msg_acceptor_scalar { + ($sname:ident, $st:ty, $qu_id:ident, $from_bytes:ident) => { + struct $sname { + query: PreparedStatement, + values: Vec<(i32, i64, i64, i64, $st)>, + series: i32, + } - fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll { - todo!() - } + impl $sname { + #[allow(unused)] + pub fn new(series: i32, cq: &CommonQueries) -> Self { + Self { + query: cq.$qu_id.clone(), + values: vec![], + series, + } + } + } + + impl MsgAcceptor for $sname { + fn len(&self) -> usize { + self.values.len() + } + + fn accept(&mut self, ts_msp: i64, ts_lsp: i64, pulse: i64, fr: &ZmtpFrame) -> Result<(), Error> { + type ST = $st; + const STL: usize = std::mem::size_of::(); + let value = ST::$from_bytes(fr.data()[0..STL].try_into()?); + self.values.push((self.series, ts_msp, ts_lsp, pulse, value)); + Ok(()) + } + + fn should_flush(&self) -> bool { + self.len() >= 140 + ((self.series as usize) & 0x1f) + } + + fn flush_batch(&mut self, scy: Arc) -> Result { + let vt = mem::replace(&mut self.values, vec![]); + let nn = vt.len(); + let mut batch = Batch::new(BatchType::Unlogged); + for _ in 0..nn { + batch.append_statement(self.query.clone()); + } + let ret = ScyBatchFutGen::new(scy, batch, vt); + Ok(ret) + } + } + }; } -pub async fn run_write_fut(fut: ChannelWriteFut) -> Result<(), Error> { - err::todo(); - let ts1 = Instant::now(); - if let Some(f) = fut.fut1 { - f.await?; - } - if let Some(f) = fut.fut2 { - f.await?; - } - let ts2 = Instant::now(); - let dt = ts2.duration_since(ts1).as_secs_f32() * 1e3; - info!("insert f64 nn {} dt {:6.2} ms", fut.nn, dt); - Ok(()) +macro_rules! impl_msg_acceptor_array { + ($sname:ident, $st:ty, $qu_id:ident, $from_bytes:ident) => { + struct $sname { + query: PreparedStatement, + values: Vec<(i32, i64, i64, i64, Vec<$st>)>, + series: i32, + array_truncate: usize, + } + + impl $sname { + #[allow(unused)] + pub fn new(series: i32, array_truncate: usize, cq: &CommonQueries) -> Self { + Self { + query: cq.$qu_id.clone(), + values: vec![], + series, + array_truncate, + } + } + } + + impl MsgAcceptor for $sname { + fn len(&self) -> usize { + self.values.len() + } + + fn accept(&mut self, ts_msp: i64, ts_lsp: i64, pulse: i64, fr: &ZmtpFrame) -> Result<(), Error> { + type ST = $st; + const STL: usize = std::mem::size_of::(); + let vc = fr.data().len() / STL; + let mut values = Vec::with_capacity(vc); + for i in 0..vc { + let h = i * STL; + let value = ST::$from_bytes(fr.data()[h..h + STL].try_into()?); + values.push(value); + } + values.truncate(self.array_truncate); + self.values.push((self.series, ts_msp, ts_lsp, pulse, values)); + Ok(()) + } + + fn should_flush(&self) -> bool { + self.len() >= 40 + ((self.series as usize) & 0x7) + } + + fn flush_batch(&mut self, scy: Arc) -> Result { + let vt = mem::replace(&mut self.values, vec![]); + let nn = vt.len(); + let mut batch = Batch::new(BatchType::Unlogged); + for _ in 0..nn { + batch.append_statement(self.query.clone()); + } + let ret = ScyBatchFutGen::new(scy, batch, vt); + Ok(ret) + } + } + }; } -pub async fn run_write_fut_f64(fut: WriteFutF64) -> Result<(), Error> { - err::todo(); - let ts1 = Instant::now(); - fut.fut1.await?; - fut.fut2.await?; - let ts2 = Instant::now(); - let dt = ts2.duration_since(ts1).as_secs_f32() * 1e3; - info!("insert f64 nn {} dt {:6.2} ms", fut.nn, dt); - Ok(()) -} +impl_msg_acceptor_scalar!(MsgAcceptorScalarU16LE, i16, qu_insert_scalar_i16, from_le_bytes); +impl_msg_acceptor_scalar!(MsgAcceptorScalarU16BE, i16, qu_insert_scalar_i16, from_be_bytes); +impl_msg_acceptor_scalar!(MsgAcceptorScalarU32LE, i32, qu_insert_scalar_i32, from_le_bytes); +impl_msg_acceptor_scalar!(MsgAcceptorScalarU32BE, i32, qu_insert_scalar_i32, from_be_bytes); +impl_msg_acceptor_scalar!(MsgAcceptorScalarI16LE, i16, qu_insert_scalar_i16, from_le_bytes); +impl_msg_acceptor_scalar!(MsgAcceptorScalarI16BE, i16, qu_insert_scalar_i16, from_be_bytes); +impl_msg_acceptor_scalar!(MsgAcceptorScalarF32LE, f32, qu_insert_scalar_f32, from_le_bytes); +impl_msg_acceptor_scalar!(MsgAcceptorScalarF32BE, f32, qu_insert_scalar_f32, from_be_bytes); +impl_msg_acceptor_scalar!(MsgAcceptorScalarF64LE, f64, qu_insert_scalar_f64, from_le_bytes); +impl_msg_acceptor_scalar!(MsgAcceptorScalarF64BE, f64, qu_insert_scalar_f64, from_be_bytes); -pub struct ChannelWriterScalarF64 { +impl_msg_acceptor_array!(MsgAcceptorArrayU16LE, i16, qu_insert_array_u16, from_le_bytes); +impl_msg_acceptor_array!(MsgAcceptorArrayU16BE, i16, qu_insert_array_u16, from_be_bytes); +impl_msg_acceptor_array!(MsgAcceptorArrayI16LE, i16, qu_insert_array_i16, from_le_bytes); +impl_msg_acceptor_array!(MsgAcceptorArrayI16BE, i16, qu_insert_array_i16, from_be_bytes); +impl_msg_acceptor_array!(MsgAcceptorArrayF32LE, f32, qu_insert_array_f32, from_le_bytes); +impl_msg_acceptor_array!(MsgAcceptorArrayF32BE, f32, qu_insert_array_f32, from_be_bytes); +impl_msg_acceptor_array!(MsgAcceptorArrayF64LE, f64, qu_insert_array_f64, from_le_bytes); +impl_msg_acceptor_array!(MsgAcceptorArrayF64BE, f64, qu_insert_array_f64, from_be_bytes); + +pub struct ChannelWriterAll { series: u32, scy: Arc, common_queries: Arc, + ts_msp_lsp: fn(u64, u32) -> (u64, u64), ts_msp_last: u64, - tmp_vals: Vec<(i32, i64, i64, i64, f64)>, + acceptor: Box, + dtype_mark: u32, } -impl ChannelWriterScalarF64 { - pub fn new(series: u32, common_queries: Arc, scy: Arc) -> Self { - Self { +impl ChannelWriterAll { + pub fn new( + series: u32, + common_queries: Arc, + scy: Arc, + scalar_type: ScalarType, + shape: Shape, + byte_order: ByteOrder, + array_truncate: usize, + ) -> Result { + let dtype_mark = scalar_type.index() as u32; + let dtype_mark = match &shape { + Shape::Scalar => dtype_mark, + Shape::Wave(_) => 1000 + dtype_mark, + Shape::Image(_, _) => 2000 + dtype_mark, + }; + let (ts_msp_lsp, acc): (fn(u64, u32) -> (u64, u64), Box) = match &shape { + Shape::Scalar => match &scalar_type { + ScalarType::U16 => match &byte_order { + ByteOrder::BE => { + let acc = MsgAcceptorScalarU16BE::new(series as i32, &common_queries); + (ts_msp_lsp_1, Box::new(acc) as _) + } + ByteOrder::LE => { + return Err(Error::with_msg_no_trace(format!( + "TODO {:?} {:?} {:?}", + scalar_type, shape, byte_order + ))); + } + }, + ScalarType::U32 => match &byte_order { + ByteOrder::BE => { + let acc = MsgAcceptorScalarU32BE::new(series as i32, &common_queries); + (ts_msp_lsp_1, Box::new(acc) as _) + } + ByteOrder::LE => { + return Err(Error::with_msg_no_trace(format!( + "TODO {:?} {:?} {:?}", + scalar_type, shape, byte_order + ))); + } + }, + ScalarType::F32 => match &byte_order { + ByteOrder::BE => { + let acc = MsgAcceptorScalarF32BE::new(series as i32, &common_queries); + (ts_msp_lsp_1, Box::new(acc) as _) + } + ByteOrder::LE => { + return Err(Error::with_msg_no_trace(format!( + "TODO {:?} {:?} {:?}", + scalar_type, shape, byte_order + ))); + } + }, + ScalarType::F64 => match &byte_order { + ByteOrder::BE => { + let acc = MsgAcceptorScalarF64BE::new(series as i32, &common_queries); + (ts_msp_lsp_1, Box::new(acc) as _) + } + ByteOrder::LE => { + return Err(Error::with_msg_no_trace(format!( + "TODO {:?} {:?} {:?}", + scalar_type, shape, byte_order + ))); + } + }, + _ => { + return Err(Error::with_msg_no_trace(format!( + "TODO {:?} {:?} {:?}", + scalar_type, shape, byte_order + ))); + } + }, + Shape::Wave(nele) => { + info!("set up wave acceptor nele {nele}"); + match &scalar_type { + ScalarType::U16 => match &byte_order { + ByteOrder::LE => { + let acc = MsgAcceptorArrayU16LE::new(series as i32, array_truncate, &common_queries); + (ts_msp_lsp_2, Box::new(acc) as _) + } + ByteOrder::BE => { + let acc = MsgAcceptorArrayU16BE::new(series as i32, array_truncate, &common_queries); + (ts_msp_lsp_2, Box::new(acc) as _) + } + }, + ScalarType::I16 => match &byte_order { + ByteOrder::LE => { + let acc = MsgAcceptorArrayI16LE::new(series as i32, array_truncate, &common_queries); + (ts_msp_lsp_2, Box::new(acc) as _) + } + ByteOrder::BE => { + let acc = MsgAcceptorArrayI16BE::new(series as i32, array_truncate, &common_queries); + (ts_msp_lsp_2, Box::new(acc) as _) + } + }, + ScalarType::F32 => match &byte_order { + ByteOrder::LE => { + let acc = MsgAcceptorArrayF32LE::new(series as i32, array_truncate, &common_queries); + (ts_msp_lsp_2, Box::new(acc) as _) + } + ByteOrder::BE => { + let acc = MsgAcceptorArrayF32BE::new(series as i32, array_truncate, &common_queries); + (ts_msp_lsp_2, Box::new(acc) as _) + } + }, + ScalarType::F64 => match &byte_order { + ByteOrder::LE => { + let acc = MsgAcceptorArrayF64LE::new(series as i32, array_truncate, &common_queries); + (ts_msp_lsp_2, Box::new(acc) as _) + } + ByteOrder::BE => { + let acc = MsgAcceptorArrayF64BE::new(series as i32, array_truncate, &common_queries); + (ts_msp_lsp_2, Box::new(acc) as _) + } + }, + _ => { + return Err(Error::with_msg_no_trace(format!( + "TODO {:?} {:?} {:?}", + scalar_type, shape, byte_order + ))); + } + } + } + _ => { + return Err(Error::with_msg_no_trace(format!( + "TODO {:?} {:?} {:?}", + scalar_type, shape, byte_order + ))); + } + }; + let ret = Self { series, scy, - ts_msp_last: 0, common_queries, - tmp_vals: vec![], - } + ts_msp_lsp, + ts_msp_last: 0, + acceptor: acc, + dtype_mark, + }; + Ok(ret) + } + + pub fn dtype_mark(&self) -> u32 { + self.dtype_mark } pub fn write_msg_impl(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result { - let (ts_msp, ts_lsp) = ts_msp_lsp_1(ts); + let (ts_msp, ts_lsp) = (self.ts_msp_lsp)(ts, self.series); let fut1 = if ts_msp != self.ts_msp_last { - info!("write_msg_impl TS MSP CHANGED ts {} pulse {}", ts, pulse); + info!("ts_msp changed ts {ts} pulse {pulse} ts_msp {ts_msp} ts_lsp {ts_lsp}"); self.ts_msp_last = ts_msp; + // TODO make the passing of the query parameters type safe. let fut = ScyQueryFut::new( self.scy.clone(), self.common_queries.qu_insert_ts_msp.clone(), - (self.series as i32, ts_msp as i64), + (self.series as i32, ts_msp as i64, self.dtype_mark as i32), ); Some(Box::pin(fut) as _) } else { None }; - let value = f64::from_be_bytes(fr.data().try_into()?); - self.tmp_vals - .push((self.series as i32, ts_msp as i64, ts_lsp as i64, pulse as i64, value)); - if self.tmp_vals.len() >= 100 + ((self.series as usize) & 0xf) { - let vt = std::mem::replace(&mut self.tmp_vals, vec![]); - let nn = vt.len(); - let mut batch = Batch::new(BatchType::Unlogged); - for _ in 0..nn { - batch.append_statement(self.common_queries.qu_insert_scalar_f64.clone()); - } - let fut = ScyBatchFut::new(self.scy.clone(), batch, vt); + self.acceptor.accept(ts_msp as i64, ts_lsp as i64, pulse as i64, fr)?; + if self.acceptor.should_flush() { + let nn = self.acceptor.len(); + let fut = self.acceptor.flush_batch(self.scy.clone())?; let fut2 = Some(Box::pin(fut) as _); let ret = ChannelWriteFut { ts1: None, @@ -299,105 +615,28 @@ impl ChannelWriterScalarF64 { } } -impl ChannelWriter for ChannelWriterScalarF64 { +impl ChannelWriter for ChannelWriterAll { fn write_msg(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result { self.write_msg_impl(ts, pulse, fr) } } -pub struct ChannelWriterArrayF64 { - series: u32, - scy: Arc, - common_queries: Arc, - ts_msp_last: u64, - tmp_vals: Vec<(i32, i64, i64, i64, Vec)>, - truncate: usize, +fn ts_msp_lsp_1(ts: u64, series: u32) -> (u64, u64) { + ts_msp_lsp_gen(ts, series, 100 * SEC) } -impl ChannelWriterArrayF64 { - pub fn new(series: u32, common_queries: Arc, scy: Arc, truncate: usize) -> Self { - Self { - series, - scy, - ts_msp_last: 0, - common_queries, - tmp_vals: vec![], - truncate, - } - } - - pub fn write_msg_impl(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result { - let (ts_msp, ts_lsp) = ts_msp_lsp_2(ts); - let fut1 = if ts_msp != self.ts_msp_last { - info!("write_msg_impl TS MSP CHANGED ts {} pulse {}", ts, pulse); - self.ts_msp_last = ts_msp; - let fut = ScyQueryFut::new( - self.scy.clone(), - self.common_queries.qu_insert_ts_msp.clone(), - (self.series as i32, ts_msp as i64), - ); - Some(Box::pin(fut) as _) - } else { - None - }; - type ST = f64; - const STL: usize = std::mem::size_of::(); - let vc = fr.data().len() / STL; - let mut values = Vec::with_capacity(vc); - for i in 0..vc { - let h = i * STL; - let value = f64::from_be_bytes(fr.data()[h..h + STL].try_into()?); - values.push(value); - } - values.truncate(self.truncate); - self.tmp_vals - .push((self.series as i32, ts_msp as i64, ts_lsp as i64, pulse as i64, values)); - if self.tmp_vals.len() >= 40 + ((self.series as usize) & 0x7) { - let vt = std::mem::replace(&mut self.tmp_vals, vec![]); - let nn = vt.len(); - let mut batch = Batch::new(BatchType::Unlogged); - for _ in 0..nn { - batch.append_statement(self.common_queries.qu_insert_array_f64.clone()); - } - let fut = ScyBatchFut::new(self.scy.clone(), batch, vt); - let fut2 = Some(Box::pin(fut) as _); - let ret = ChannelWriteFut { - ts1: None, - mask: 0, - nn, - fut1, - fut2, - }; - Ok(ret) - } else { - let ret = ChannelWriteFut { - ts1: None, - mask: 0, - nn: 0, - fut1: fut1, - fut2: None, - }; - Ok(ret) - } - } +fn ts_msp_lsp_2(ts: u64, series: u32) -> (u64, u64) { + ts_msp_lsp_gen(ts, series, 10 * SEC) } -impl ChannelWriter for ChannelWriterArrayF64 { - fn write_msg(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result { - self.write_msg_impl(ts, pulse, fr) +fn ts_msp_lsp_gen(ts: u64, series: u32, fak: u64) -> (u64, u64) { + if ts < u32::MAX as u64 { + return (0, 0); } -} - -fn ts_msp_lsp_1(ts: u64) -> (u64, u64) { - const MASK: u64 = u64::MAX >> 23; - let ts_msp = ts & (!MASK); - let ts_lsp = ts & MASK; - (ts_msp, ts_lsp) -} - -fn ts_msp_lsp_2(ts: u64) -> (u64, u64) { - const MASK: u64 = u64::MAX >> 16; - let ts_msp = ts & (!MASK); - let ts_lsp = ts & MASK; + let off = series as u64; + let ts_a = ts - off; + let ts_b = ts_a / fak; + let ts_lsp = ts_a % fak; + let ts_msp = ts_b * fak + off; (ts_msp, ts_lsp) } diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index 4e1db4f..b3d4701 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -1,6 +1,6 @@ use crate::bsread::{parse_zmtp_message, BsreadMessage}; use crate::bsread::{ChannelDesc, GlobalTimestamp, HeadA, HeadB}; -use crate::channelwriter::{ChannelWriter, ChannelWriterArrayF64, ChannelWriterScalarF64}; +use crate::channelwriter::{ChannelWriter, ChannelWriterAll}; use crate::netbuf::NetBuf; use async_channel::{Receiver, Sender}; #[allow(unused)] @@ -10,11 +10,13 @@ use futures_core::{Future, Stream}; use futures_util::{pin_mut, FutureExt, StreamExt}; use log::*; use netpod::timeunits::*; +use netpod::{ByteOrder, ScalarType, Shape}; use scylla::batch::{Batch, BatchType, Consistency}; use scylla::prepared_statement::PreparedStatement; use scylla::transport::errors::QueryError; use scylla::{Session as ScySession, SessionBuilder}; use serde_json::Value as JsVal; +use stats::CheckEvery; use std::collections::BTreeMap; use std::ffi::CStr; use std::fmt; @@ -82,7 +84,15 @@ pub struct CommonQueries { pub qu1: PreparedStatement, pub qu2: PreparedStatement, pub qu_insert_ts_msp: PreparedStatement, + pub qu_insert_scalar_u16: PreparedStatement, + pub qu_insert_scalar_u32: PreparedStatement, + pub qu_insert_scalar_i16: PreparedStatement, + pub qu_insert_scalar_i32: PreparedStatement, + pub qu_insert_scalar_f32: PreparedStatement, pub qu_insert_scalar_f64: PreparedStatement, + pub qu_insert_array_u16: PreparedStatement, + pub qu_insert_array_i16: PreparedStatement, + pub qu_insert_array_f32: PreparedStatement, pub qu_insert_array_f64: PreparedStatement, } @@ -128,6 +138,7 @@ struct BsreadClient { scy: Arc, channel_writers: BTreeMap>, common_queries: Arc, + print_stats: CheckEvery, } impl BsreadClient { @@ -146,6 +157,7 @@ impl BsreadClient { scy, channel_writers: Default::default(), common_queries, + print_stats: CheckEvery::new(Duration::from_millis(2000)), }; Ok(ret) } @@ -162,8 +174,9 @@ impl BsreadClient { let mut frame_diff_count = 0u64; let mut hash_mismatch_count = 0u64; let mut head_b = HeadB::empty(); - let mut status_last = Instant::now(); let mut bytes_payload = 0u64; + let mut rows_inserted = 0u32; + let mut time_spent_inserting = Duration::from_millis(0); while let Some(item) = zmtp.next().await { match item { Ok(ev) => match ev { @@ -200,7 +213,12 @@ impl BsreadClient { dh_md5_last = bm.head_b_md5.clone(); for chn in &head_b.channels { info!("Setup writer for {}", chn.name); - self.setup_channel_writers(chn)?; + match self.setup_channel_writers(chn).await { + Ok(_) => {} + Err(e) => { + warn!("can not set up writer for {} {e:?}", chn.name); + } + } } } else { error!("changed data header hash {} mh {}", bm.head_b_md5, bm.head_a.hash); @@ -241,11 +259,13 @@ impl BsreadClient { let series = get_series_id(chn); if !self.channel_writers.contains_key(&series) {} if let Some(cw) = self.channel_writers.get_mut(&series) { - cw.write_msg(ts, pulse, fr)?.await?; + let res = cw.write_msg(ts, pulse, fr)?.await?; + rows_inserted += res.nrows; + time_spent_inserting = time_spent_inserting + res.dt; bytes_payload += fr.data().len() as u64; } else { // TODO check for missing writers. - //warn!("no writer for {}", chn.name); + warn!("no writer for {}", chn.name); } } } @@ -272,19 +292,21 @@ impl BsreadClient { if false && msgc > 10000 { break; } - let tsnow = Instant::now(); - let dt = tsnow.duration_since(status_last); - if dt >= Duration::from_millis(2000) { - let r = bytes_payload as f32 / dt.as_secs_f32() * 1e-3; - info!("rate: {r:8.3} kB/s"); - status_last = tsnow; + let dt = self.print_stats.is_elapsed_now(); + if dt > 0. { + let nrs = rows_inserted as f32 / dt; + let dt_ins = time_spent_inserting.as_secs_f32() * 1e3; + let r = bytes_payload as f32 / dt * 1e-3; + info!("insert {nrs:.0} 1/s dt-ins {dt_ins:4.0} ms payload {r:8.3} kB/s"); + rows_inserted = 0; + time_spent_inserting = Duration::from_millis(0); bytes_payload = 0; } } Ok(()) } - fn setup_channel_writers(&mut self, chn: &ChannelDesc) -> Result<(), Error> { + async fn setup_channel_writers(&mut self, chn: &ChannelDesc) -> Result<(), Error> { let series = get_series_id(chn); let has_comp = match &chn.compression { Some(s) => s != "none", @@ -294,48 +316,29 @@ impl BsreadClient { warn!("Compression not yet supported [{}]", chn.name); return Ok(()); } - match chn.ty.as_str() { - "float64" => match &chn.shape { - JsVal::Array(a) => { - if a.len() == 1 { - if let Some(n) = a[0].as_u64() { - if n == 1 { - if chn.encoding == "big" { - let cw = ChannelWriterScalarF64::new( - series, - self.common_queries.clone(), - self.scy.clone(), - ); - self.channel_writers.insert(series, Box::new(cw)); - } else { - warn!("TODO scalar f64 LE"); - } - } else { - if chn.encoding == "big" { - let cw = ChannelWriterArrayF64::new( - series, - self.common_queries.clone(), - self.scy.clone(), - self.opts.array_truncate.unwrap_or(64), - ); - self.channel_writers.insert(series, Box::new(cw)); - } else { - warn!("TODO array f64 LE"); - } - } - } - } else { - warn!("TODO writer f64 shape {:?}", a); - } - } - s => { - warn!("TODO writer f64 shape {:?}", s); - } - }, - k => { - warn!("TODO writer dtype {:?}", k); - } - } + let scalar_type = ScalarType::from_bsread_str(&chn.ty)?; + let shape = Shape::from_bsread_jsval(&chn.shape)?; + let byte_order = ByteOrder::from_bsread_str(&chn.encoding)?; + let trunc = self.opts.array_truncate.unwrap_or(64); + let cw = ChannelWriterAll::new( + series, + self.common_queries.clone(), + self.scy.clone(), + scalar_type.clone(), + shape.clone(), + byte_order.clone(), + trunc, + )?; + let dtype_mark = cw.dtype_mark(); + self.channel_writers.insert(series, Box::new(cw)); + // TODO insert correct facility name + self.scy + .query( + "insert into series_by_channel (facility, channel_name, dtype, series) values (?, ?, ?, ?)", + ("scylla", &chn.name, dtype_mark as i32, series as i32), + ) + .await + .err_conv()?; Ok(()) } @@ -404,13 +407,45 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> { .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_ts_msp = scy - .prepare("insert into ts_msp (series, ts_msp) values (?, ?)") + .prepare("insert into ts_msp (series, ts_msp, dtype) values (?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_scalar_u16 = scy + .prepare("insert into events_scalar_u16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_scalar_u32 = scy + .prepare("insert into events_scalar_u32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_scalar_i16 = scy + .prepare("insert into events_scalar_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_scalar_i32 = scy + .prepare("insert into events_scalar_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_scalar_f32 = scy + .prepare("insert into events_scalar_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_scalar_f64 = scy .prepare("insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_array_u16 = scy + .prepare("insert into events_array_u16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_array_i16 = scy + .prepare("insert into events_array_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_array_f32 = scy + .prepare("insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_array_f64 = scy .prepare("insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") .await @@ -419,7 +454,15 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> { qu1, qu2, qu_insert_ts_msp, + qu_insert_scalar_u16, + qu_insert_scalar_u32, + qu_insert_scalar_i16, + qu_insert_scalar_i32, + qu_insert_scalar_f32, qu_insert_scalar_f64, + qu_insert_array_u16, + qu_insert_array_i16, + qu_insert_array_f32, qu_insert_array_f64, }; let common_queries = Arc::new(common_queries); @@ -427,7 +470,6 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> { for source_addr in &opts.sources { let client = BsreadClient::new(opts.clone(), source_addr.into(), scy.clone(), common_queries.clone()).await?; let fut = ClientRun::new(client); - //let client = Box::pin(client) as Pin>>>; clients.push(fut); } futures_util::future::join_all(clients).await; diff --git a/stats/Cargo.toml b/stats/Cargo.toml new file mode 100644 index 0000000..87f31b5 --- /dev/null +++ b/stats/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "stats" +version = "0.0.1-a.0" +authors = ["Dominik Werder "] +edition = "2021" + +[lib] +path = "src/stats.rs" + +[dependencies] +log = { path = "../log" } +err = { path = "../../daqbuffer/err" } diff --git a/stats/src/stats.rs b/stats/src/stats.rs new file mode 100644 index 0000000..1d68428 --- /dev/null +++ b/stats/src/stats.rs @@ -0,0 +1,52 @@ +use std::time::{Duration, Instant}; + +pub struct EMA { + ema: f32, + emv: f32, + k: f32, +} + +impl EMA { + pub fn default() -> Self { + Self { + ema: 0.0, + emv: 0.0, + k: 0.05, + } + } + + pub fn update(&mut self, v: V) + where + V: Into, + { + let k = self.k; + let dv = v.into() - self.ema; + self.ema += k * dv; + self.emv = (1f32 - k) * (self.emv + k * dv * dv); + } +} + +pub struct CheckEvery { + ts_last: Instant, + dt: Duration, +} + +impl CheckEvery { + pub fn new(dt: Duration) -> Self { + Self { + ts_last: Instant::now(), + dt, + } + } + + pub fn is_elapsed_now(&mut self) -> f32 { + let now = Instant::now(); + let dt = now.duration_since(self.ts_last); + if dt >= self.dt { + self.ts_last = now; + dt.as_secs_f32() + } else { + -16f32 + } + } +}