Add options and test insert

This commit is contained in:
Dominik Werder
2022-04-24 18:30:16 +02:00
parent e965845ba6
commit 77efc38323
8 changed files with 221 additions and 95 deletions
+11
View File
@@ -0,0 +1,11 @@
[build]
rustflags = [
"-C", "target-cpu=sandybridge",
"-C", "force-frame-pointers=yes",
"-C", "force-unwind-tables=yes",
#"-C", "relocation-model=static",
#"-C", "embed-bitcode=no",
#"-C", "inline-threshold=1000",
#"-Z", "time-passes=yes",
#"-Z", "time-llvm-passes=yes",
]
+3 -3
View File
@@ -2,11 +2,11 @@
members = ["log", "netfetch", "daqingest"]
[profile.release]
opt-level = 1
debug = 0
opt-level = 2
debug = 1
overflow-checks = false
debug-assertions = false
lto = false
lto = "thin"
codegen-units = 32
incremental = true
+1
View File
@@ -4,6 +4,7 @@ use err::Error;
pub fn main() -> Result<(), Error> {
taskrun::run(async {
log::info!("daqingest version {}", clap::crate_version!());
if false {
return Err(Error::with_msg_no_trace(format!("unknown command")));
} else {
+6
View File
@@ -36,6 +36,10 @@ pub struct Bsread {
pub array_truncate: Option<usize>,
#[clap(long)]
pub do_pulse_id: bool,
#[clap(long)]
pub skip_insert: bool,
#[clap(long)]
pub process_channel_count_limit: Option<usize>,
}
impl From<Bsread> for ZmtpClientOpts {
@@ -46,6 +50,8 @@ impl From<Bsread> for ZmtpClientOpts {
rcvbuf: k.rcvbuf,
array_truncate: k.array_truncate,
do_pulse_id: k.do_pulse_id,
process_channel_count_limit: k.process_channel_count_limit,
skip_insert: k.skip_insert,
}
}
}
+102 -59
View File
@@ -24,13 +24,13 @@ pub struct ScyQueryFut<V> {
query: Box<PreparedStatement>,
#[allow(unused)]
values: Box<V>,
fut: Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>>>>,
fut: Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>> + Send>>,
}
impl<V> ScyQueryFut<V> {
pub fn new(scy: Arc<ScySession>, query: PreparedStatement, values: V) -> Self
where
V: ValueList + 'static,
V: ValueList + Sync + 'static,
{
let query = Box::new(query);
let values = Box::new(values);
@@ -139,7 +139,7 @@ pub struct ScyBatchFutGen {
scy: Arc<ScySession>,
#[allow(unused)]
batch: Box<Batch>,
fut: Pin<Box<dyn Future<Output = Result<BatchResult, QueryError>>>>,
fut: Pin<Box<dyn Future<Output = Result<BatchResult, QueryError>> + Send>>,
polled: usize,
ts_create: Instant,
ts_poll_start: Instant,
@@ -148,7 +148,7 @@ pub struct ScyBatchFutGen {
impl ScyBatchFutGen {
pub fn new<V>(scy: Arc<ScySession>, batch: Batch, values: V) -> Self
where
V: BatchValues + 'static,
V: BatchValues + Sync + Send + 'static,
{
let batch = Box::new(batch);
let scy_ref = unsafe { &*(&scy as &_ as *const _) } as &ScySession;
@@ -203,7 +203,7 @@ pub struct InsertLoopFut {
scy: Arc<ScySession>,
#[allow(unused)]
query: Arc<PreparedStatement>,
futs: Vec<Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>>>>>,
futs: Vec<Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>> + Send>>>,
fut_ix: usize,
polled: usize,
ts_create: Instant,
@@ -211,11 +211,14 @@ pub struct InsertLoopFut {
}
impl InsertLoopFut {
pub fn new<V>(scy: Arc<ScySession>, query: PreparedStatement, values: Vec<V>) -> Self
pub fn new<V>(scy: Arc<ScySession>, query: PreparedStatement, values: Vec<V>, skip_insert: bool) -> Self
where
V: ValueList + 'static,
V: ValueList + Send + 'static,
{
//values.clear();
let mut values = values;
if skip_insert {
values.clear();
}
let query = Arc::new(query);
let scy_ref = unsafe { &*(&scy as &_ as *const _) } as &ScySession;
let query_ref = unsafe { &*(&query as &_ as *const _) } as &PreparedStatement;
@@ -225,7 +228,6 @@ impl InsertLoopFut {
let futs: Vec<_> = values
.into_iter()
.map(|vs| {
//
let fut = scy_ref.execute(query_ref, vs);
Box::pin(fut) as _
})
@@ -301,8 +303,8 @@ pub struct ChannelWriteRes {
pub struct ChannelWriteFut {
nn: usize,
fut1: Option<Pin<Box<dyn Future<Output = Result<(), Error>>>>>,
fut2: Option<Pin<Box<dyn Future<Output = Result<(), Error>>>>>,
fut1: Option<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
fut2: Option<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
ts1: Option<Instant>,
mask: u8,
}
@@ -374,6 +376,12 @@ pub trait ChannelWriter {
fn write_msg(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result<ChannelWriteFut, Error>;
}
struct MsgAcceptorOptions {
cq: Arc<CommonQueries>,
skip_insert: bool,
array_truncate: usize,
}
trait MsgAcceptor {
fn len(&self) -> usize;
fn accept(&mut self, ts_msp: i64, ts_lsp: i64, pulse: i64, fr: &ZmtpFrame) -> Result<(), Error>;
@@ -388,14 +396,16 @@ macro_rules! impl_msg_acceptor_scalar {
query: PreparedStatement,
values: Vec<(i64, i64, i64, i64, $st)>,
series: i64,
opts: MsgAcceptorOptions,
}
impl $sname {
pub fn new(series: i64, cq: &CommonQueries) -> Self {
pub fn new(series: i64, opts: MsgAcceptorOptions) -> Self {
Self {
query: cq.$qu_id.clone(),
query: opts.cq.$qu_id.clone(),
values: vec![],
series,
opts,
}
}
}
@@ -439,7 +449,7 @@ macro_rules! impl_msg_acceptor_scalar {
fn flush_loop(&mut self, scy: Arc<ScySession>) -> Result<InsertLoopFut, Error> {
let vt = mem::replace(&mut self.values, vec![]);
let ret = InsertLoopFut::new(scy, self.query.clone(), vt);
let ret = InsertLoopFut::new(scy, self.query.clone(), vt, self.opts.skip_insert);
Ok(ret)
}
}
@@ -454,16 +464,18 @@ macro_rules! impl_msg_acceptor_array {
series: i64,
array_truncate: usize,
truncated: usize,
opts: MsgAcceptorOptions,
}
impl $sname {
pub fn new(series: i64, array_truncate: usize, cq: &CommonQueries) -> Self {
pub fn new(series: i64, opts: MsgAcceptorOptions) -> Self {
Self {
query: cq.$qu_id.clone(),
query: opts.cq.$qu_id.clone(),
values: vec![],
series,
array_truncate,
array_truncate: opts.array_truncate,
truncated: 0,
opts,
}
}
}
@@ -478,10 +490,26 @@ macro_rules! impl_msg_acceptor_array {
const STL: usize = std::mem::size_of::<ST>();
let vc = fr.data().len() / STL;
let mut values = Vec::with_capacity(vc);
for i in 0..vc {
let h = i * STL;
let value = ST::$from_bytes(fr.data()[h..h + STL].try_into()?);
values.push(value);
if false {
for i in 0..vc {
let h = i * STL;
let value = ST::$from_bytes(fr.data()[h..h + STL].try_into()?);
values.push(value);
}
} else {
let mut ptr: *const u8 = fr.data().as_ptr();
let mut ptr2: *mut ST = values.as_mut_ptr();
for _ in 0..vc {
unsafe {
let a: &[u8; STL] = &*(ptr as *const [u8; STL]);
*ptr2 = ST::$from_bytes(*a);
}
ptr = ptr.wrapping_offset(STL as isize);
ptr2 = ptr2.wrapping_offset(1);
}
unsafe {
values.set_len(vc);
}
}
if values.len() > self.array_truncate {
if self.truncated < 10 {
@@ -516,7 +544,7 @@ macro_rules! impl_msg_acceptor_array {
fn flush_loop(&mut self, scy: Arc<ScySession>) -> Result<InsertLoopFut, Error> {
let vt = mem::replace(&mut self.values, vec![]);
let ret = InsertLoopFut::new(scy, self.query.clone(), vt);
let ret = InsertLoopFut::new(scy, self.query.clone(), vt, self.opts.skip_insert);
Ok(ret)
}
}
@@ -553,16 +581,18 @@ struct MsgAcceptorArrayBool {
series: i64,
array_truncate: usize,
truncated: usize,
opts: MsgAcceptorOptions,
}
impl MsgAcceptorArrayBool {
pub fn new(series: i64, array_truncate: usize, cq: &CommonQueries) -> Self {
pub fn new(series: i64, opts: MsgAcceptorOptions) -> Self {
Self {
query: cq.qu_insert_array_bool.clone(),
query: opts.cq.qu_insert_array_bool.clone(),
values: vec![],
series,
array_truncate,
array_truncate: opts.array_truncate,
truncated: 0,
opts,
}
}
}
@@ -616,7 +646,7 @@ impl MsgAcceptor for MsgAcceptorArrayBool {
fn flush_loop(&mut self, scy: Arc<ScySession>) -> Result<InsertLoopFut, Error> {
let vt = mem::replace(&mut self.values, vec![]);
let ret = InsertLoopFut::new(scy, self.query.clone(), vt);
let ret = InsertLoopFut::new(scy, self.query.clone(), vt, self.opts.skip_insert);
Ok(ret)
}
}
@@ -627,12 +657,14 @@ pub struct ChannelWriterAll {
common_queries: Arc<CommonQueries>,
ts_msp_lsp: fn(u64, u64) -> (u64, u64),
ts_msp_last: u64,
acceptor: Box<dyn MsgAcceptor>,
acceptor: Box<dyn MsgAcceptor + Send>,
#[allow(unused)]
scalar_type: ScalarType,
#[allow(unused)]
shape: Shape,
pulse_last: u64,
#[allow(unused)]
skip_insert: bool,
}
impl ChannelWriterAll {
@@ -644,66 +676,72 @@ impl ChannelWriterAll {
shape: Shape,
byte_order: ByteOrder,
array_truncate: usize,
skip_insert: bool,
) -> Result<Self, Error> {
let (ts_msp_lsp, acc): (fn(u64, u64) -> (u64, u64), Box<dyn MsgAcceptor>) = match &shape {
let opts = MsgAcceptorOptions {
cq: common_queries.clone(),
skip_insert,
array_truncate,
};
let (ts_msp_lsp, acc): (fn(u64, u64) -> (u64, u64), Box<dyn MsgAcceptor + Send>) = match &shape {
Shape::Scalar => match &scalar_type {
ScalarType::U16 => match &byte_order {
ByteOrder::LE => {
let acc = MsgAcceptorScalarU16LE::new(series as i64, &common_queries);
let acc = MsgAcceptorScalarU16LE::new(series as i64, opts);
(ts_msp_lsp_1, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorScalarU16BE::new(series as i64, &common_queries);
let acc = MsgAcceptorScalarU16BE::new(series as i64, opts);
(ts_msp_lsp_1, Box::new(acc) as _)
}
},
ScalarType::U32 => match &byte_order {
ByteOrder::LE => {
let acc = MsgAcceptorScalarU32LE::new(series as i64, &common_queries);
let acc = MsgAcceptorScalarU32LE::new(series as i64, opts);
(ts_msp_lsp_1, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorScalarU32BE::new(series as i64, &common_queries);
let acc = MsgAcceptorScalarU32BE::new(series as i64, opts);
(ts_msp_lsp_1, Box::new(acc) as _)
}
},
ScalarType::I16 => match &byte_order {
ByteOrder::LE => {
let acc = MsgAcceptorScalarI16LE::new(series as i64, &common_queries);
let acc = MsgAcceptorScalarI16LE::new(series as i64, opts);
(ts_msp_lsp_1, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorScalarI16BE::new(series as i64, &common_queries);
let acc = MsgAcceptorScalarI16BE::new(series as i64, opts);
(ts_msp_lsp_1, Box::new(acc) as _)
}
},
ScalarType::I32 => match &byte_order {
ByteOrder::LE => {
let acc = MsgAcceptorScalarI32LE::new(series as i64, &common_queries);
let acc = MsgAcceptorScalarI32LE::new(series as i64, opts);
(ts_msp_lsp_1, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorScalarI32BE::new(series as i64, &common_queries);
let acc = MsgAcceptorScalarI32BE::new(series as i64, opts);
(ts_msp_lsp_1, Box::new(acc) as _)
}
},
ScalarType::F32 => match &byte_order {
ByteOrder::LE => {
let acc = MsgAcceptorScalarF32LE::new(series as i64, &common_queries);
let acc = MsgAcceptorScalarF32LE::new(series as i64, opts);
(ts_msp_lsp_1, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorScalarF32BE::new(series as i64, &common_queries);
let acc = MsgAcceptorScalarF32BE::new(series as i64, opts);
(ts_msp_lsp_1, Box::new(acc) as _)
}
},
ScalarType::F64 => match &byte_order {
ByteOrder::LE => {
let acc = MsgAcceptorScalarF64LE::new(series as i64, &common_queries);
let acc = MsgAcceptorScalarF64LE::new(series as i64, opts);
(ts_msp_lsp_1, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorScalarF64BE::new(series as i64, &common_queries);
let acc = MsgAcceptorScalarF64BE::new(series as i64, opts);
(ts_msp_lsp_1, Box::new(acc) as _)
}
},
@@ -719,57 +757,57 @@ impl ChannelWriterAll {
match &scalar_type {
ScalarType::BOOL => match &byte_order {
_ => {
let acc = MsgAcceptorArrayBool::new(series as i64, array_truncate, &common_queries);
let acc = MsgAcceptorArrayBool::new(series as i64, opts);
(ts_msp_lsp_2, Box::new(acc) as _)
}
},
ScalarType::U16 => match &byte_order {
ByteOrder::LE => {
let acc = MsgAcceptorArrayU16LE::new(series as i64, array_truncate, &common_queries);
let acc = MsgAcceptorArrayU16LE::new(series as i64, opts);
(ts_msp_lsp_2, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorArrayU16BE::new(series as i64, array_truncate, &common_queries);
let acc = MsgAcceptorArrayU16BE::new(series as i64, opts);
(ts_msp_lsp_2, Box::new(acc) as _)
}
},
ScalarType::I16 => match &byte_order {
ByteOrder::LE => {
let acc = MsgAcceptorArrayI16LE::new(series as i64, array_truncate, &common_queries);
let acc = MsgAcceptorArrayI16LE::new(series as i64, opts);
(ts_msp_lsp_2, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorArrayI16BE::new(series as i64, array_truncate, &common_queries);
let acc = MsgAcceptorArrayI16BE::new(series as i64, opts);
(ts_msp_lsp_2, Box::new(acc) as _)
}
},
ScalarType::I32 => match &byte_order {
ByteOrder::LE => {
let acc = MsgAcceptorArrayI32LE::new(series as i64, array_truncate, &common_queries);
let acc = MsgAcceptorArrayI32LE::new(series as i64, opts);
(ts_msp_lsp_2, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorArrayI32BE::new(series as i64, array_truncate, &common_queries);
let acc = MsgAcceptorArrayI32BE::new(series as i64, opts);
(ts_msp_lsp_2, Box::new(acc) as _)
}
},
ScalarType::F32 => match &byte_order {
ByteOrder::LE => {
let acc = MsgAcceptorArrayF32LE::new(series as i64, array_truncate, &common_queries);
let acc = MsgAcceptorArrayF32LE::new(series as i64, opts);
(ts_msp_lsp_2, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorArrayF32BE::new(series as i64, array_truncate, &common_queries);
let acc = MsgAcceptorArrayF32BE::new(series as i64, opts);
(ts_msp_lsp_2, Box::new(acc) as _)
}
},
ScalarType::F64 => match &byte_order {
ByteOrder::LE => {
let acc = MsgAcceptorArrayF64LE::new(series as i64, array_truncate, &common_queries);
let acc = MsgAcceptorArrayF64LE::new(series as i64, opts);
(ts_msp_lsp_2, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorArrayF64BE::new(series as i64, array_truncate, &common_queries);
let acc = MsgAcceptorArrayF64BE::new(series as i64, opts);
(ts_msp_lsp_2, Box::new(acc) as _)
}
},
@@ -798,6 +836,7 @@ impl ChannelWriterAll {
scalar_type,
shape,
pulse_last: 0,
skip_insert,
};
Ok(ret)
}
@@ -814,14 +853,18 @@ impl ChannelWriterAll {
let fut1 = if ts_msp != self.ts_msp_last {
debug!("ts_msp changed ts {ts} pulse {pulse} ts_msp {ts_msp} ts_lsp {ts_lsp}");
self.ts_msp_last = ts_msp;
// TODO make the passing of the query parameters type safe.
// TODO the "dtype" table field is not needed here. Drop from database.
let fut = ScyQueryFut::new(
self.scy.clone(),
self.common_queries.qu_insert_ts_msp.clone(),
(self.series as i64, ts_msp as i64),
);
Some(Box::pin(fut) as _)
if !self.skip_insert {
// TODO make the passing of the query parameters type safe.
// TODO the "dtype" table field is not needed here. Drop from database.
let fut = ScyQueryFut::new(
self.scy.clone(),
self.common_queries.qu_insert_ts_msp.clone(),
(self.series as i64, ts_msp as i64),
);
Some(Box::pin(fut) as _)
} else {
None
}
} else {
None
};
+21 -20
View File
@@ -7,6 +7,12 @@ pub struct NetBuf {
rp: usize,
}
macro_rules! check_invariants {
($self:expr) => {
//$self.check_invariants()
};
}
impl NetBuf {
pub fn new(cap: usize) -> Self {
Self {
@@ -21,27 +27,27 @@ impl NetBuf {
}
pub fn len(&self) -> usize {
self.check_invariant();
check_invariants!(self);
self.wp - self.rp
}
pub fn cap(&self) -> usize {
self.check_invariant();
check_invariants!(self);
self.buf.len()
}
pub fn wcap(&self) -> usize {
self.check_invariant();
check_invariants!(self);
self.buf.len() - self.wp
}
pub fn data(&self) -> &[u8] {
self.check_invariant();
check_invariants!(self);
&self.buf[self.rp..self.wp]
}
pub fn adv(&mut self, x: usize) -> Result<(), Error> {
self.check_invariant();
check_invariants!(self);
if self.len() < x {
return Err(Error::with_msg_no_trace("not enough bytes"));
} else {
@@ -51,7 +57,7 @@ impl NetBuf {
}
pub fn wadv(&mut self, x: usize) -> Result<(), Error> {
self.check_invariant();
check_invariants!(self);
if self.wcap() < x {
return Err(Error::with_msg_no_trace("not enough space"));
} else {
@@ -61,7 +67,7 @@ impl NetBuf {
}
pub fn read_u8(&mut self) -> Result<u8, Error> {
self.check_invariant();
check_invariants!(self);
type T = u8;
const TS: usize = std::mem::size_of::<T>();
if self.len() < TS {
@@ -74,7 +80,7 @@ impl NetBuf {
}
pub fn read_u64(&mut self) -> Result<u64, Error> {
self.check_invariant();
check_invariants!(self);
type T = u64;
const TS: usize = std::mem::size_of::<T>();
if self.len() < TS {
@@ -87,7 +93,7 @@ impl NetBuf {
}
pub fn read_bytes(&mut self, n: usize) -> Result<&[u8], Error> {
self.check_invariant();
check_invariants!(self);
if self.len() < n {
return Err(Error::with_msg_no_trace("not enough bytes"));
} else {
@@ -98,14 +104,14 @@ impl NetBuf {
}
pub fn read_buf_for_fill(&mut self) -> ReadBuf {
self.check_invariant();
check_invariants!(self);
self.rewind_if_needed();
let read_buf = ReadBuf::new(&mut self.buf[self.wp..]);
read_buf
}
pub fn rewind_if_needed(&mut self) {
self.check_invariant();
check_invariants!(self);
if self.rp != 0 && self.rp == self.wp {
self.rp = 0;
self.wp = 0;
@@ -117,7 +123,7 @@ impl NetBuf {
}
pub fn put_slice(&mut self, buf: &[u8]) -> Result<(), Error> {
self.check_invariant();
check_invariants!(self);
self.rewind_if_needed();
if self.wcap() < buf.len() {
return Err(Error::with_msg_no_trace("not enough space"));
@@ -129,7 +135,7 @@ impl NetBuf {
}
pub fn put_u8(&mut self, v: u8) -> Result<(), Error> {
self.check_invariant();
check_invariants!(self);
type T = u8;
const TS: usize = std::mem::size_of::<T>();
self.rewind_if_needed();
@@ -143,7 +149,7 @@ impl NetBuf {
}
pub fn put_u64(&mut self, v: u64) -> Result<(), Error> {
self.check_invariant();
check_invariants!(self);
type T = u64;
const TS: usize = std::mem::size_of::<T>();
self.rewind_if_needed();
@@ -157,12 +163,7 @@ impl NetBuf {
}
#[allow(unused)]
#[inline(always)]
fn check_invariant(&self) {}
#[allow(unused)]
#[inline(always)]
fn check_invariant2(&self) {
fn check_invariants(&self) {
if self.wp > self.buf.len() {
eprintln!("ERROR netbuf wp {} rp {}", self.wp, self.rp);
std::process::exit(87);
+52 -13
View File
@@ -109,12 +109,14 @@ pub struct ZmtpClientOpts {
pub do_pulse_id: bool,
pub rcvbuf: Option<usize>,
pub array_truncate: Option<usize>,
pub process_channel_count_limit: Option<usize>,
pub skip_insert: bool,
}
struct ClientRun {
#[allow(unused)]
client: Pin<Box<BsreadClient>>,
fut: Pin<Box<dyn Future<Output = Result<(), Error>>>>,
fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>,
}
impl ClientRun {
@@ -142,7 +144,7 @@ struct BsreadClient {
rcvbuf: Option<usize>,
tmp_vals_pulse_map: Vec<(i64, i32, i64, i32)>,
scy: Arc<ScySession>,
channel_writers: BTreeMap<u64, Box<dyn ChannelWriter>>,
channel_writers: BTreeMap<u64, Box<dyn ChannelWriter + Send>>,
common_queries: Arc<CommonQueries>,
print_stats: CheckEvery,
parser: Parser,
@@ -186,12 +188,20 @@ impl BsreadClient {
let mut rows_inserted = 0u32;
let mut time_spent_inserting = Duration::from_millis(0);
let mut series_ids = Vec::new();
let mut msg_dt_ema = stats::EMA::with_k(0.01);
let mut msg_ts_last = Instant::now();
while let Some(item) = zmtp.next().await {
let tsnow = Instant::now();
match item {
Ok(ev) => match ev {
ZmtpEvent::ZmtpCommand(_) => (),
ZmtpEvent::ZmtpMessage(msg) => {
msgc += 1;
{
let dt = tsnow.duration_since(msg_ts_last);
msg_dt_ema.update(dt.as_secs_f32());
msg_ts_last = tsnow;
}
match self.parser.parse_zmtp_message(&msg) {
Ok(bm) => {
if msg.frames().len() - 2 * bm.head_b.channels.len() != 2 {
@@ -268,7 +278,11 @@ impl BsreadClient {
// TODO limit log rate
warn!("Bad pulse {} for {}", pulse, self.source_addr);
}
for i1 in 0..head_b.channels.len() {
for i1 in 0..head_b
.channels
.len()
.min(self.opts.process_channel_count_limit.unwrap_or(4000))
{
let chn = &head_b.channels[i1];
let fr = &msg.frames[2 + 2 * i1];
if i1 >= series_ids.len() {
@@ -323,6 +337,13 @@ impl BsreadClient {
rows_inserted = 0;
time_spent_inserting = Duration::from_millis(0);
bytes_payload = 0;
if msg_dt_ema.update_count() > 100 {
let ema = msg_dt_ema.ema();
if ema < 0.005 {
let emv = msg_dt_ema.emv().sqrt();
warn!("MSG FREQ {} {:9.5} {:9.5}", self.source_addr, ema, emv);
}
}
}
}
Ok(())
@@ -350,17 +371,20 @@ impl BsreadClient {
shape.clone(),
byte_order.clone(),
trunc,
self.opts.skip_insert,
)?;
let shape_dims = shape.to_scylla_vec();
self.channel_writers.insert(series, Box::new(cw));
// TODO insert correct facility name
self.scy
if !self.opts.skip_insert {
// TODO insert correct facility name
self.scy
.query(
"insert into series_by_channel (facility, channel_name, series, scalar_type, shape_dims) values (?, ?, ?, ?, ?)",
("scylla", &chn.name, series as i64, scalar_type.to_scylla_i32(), &shape_dims),
)
.await
.err_conv()?;
}
Ok(())
}
@@ -498,13 +522,15 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> {
qu_insert_array_bool,
};
let common_queries = Arc::new(common_queries);
let mut clients = vec![];
let mut jhs = vec![];
for source_addr in &opts.sources {
let client = BsreadClient::new(opts.clone(), source_addr.into(), scy.clone(), common_queries.clone()).await?;
let fut = ClientRun::new(client);
clients.push(fut);
//clients.push(fut);
let jh = tokio::spawn(fut);
jhs.push(jh);
}
futures_util::future::join_all(clients).await;
futures_util::future::join_all(jhs).await;
Ok(())
}
@@ -543,10 +569,13 @@ fn set_rcv_sock_opts(conn: &mut TcpStream, rcvbuf: u32) -> Result<(), Error> {
if ec != 0 {
let errno = *libc::__errno_location();
let es = CStr::from_ptr(libc::strerror(errno));
warn!("can not query socket option ec {ec} errno {errno} es {es:?}");
error!("can not query socket option");
error!("can not query socket option ec {ec} errno {errno} es {es:?}");
} else {
info!("SO_RCVBUF {n}");
if (n as u32) < rcvbuf * 5 / 6 {
warn!("SO_RCVBUF {n} smaller than requested {rcvbuf}");
} else {
info!("SO_RCVBUF {n}");
}
}
}
Ok(())
@@ -792,13 +821,23 @@ impl Zmtp {
(&mut self.conn, self.outbuf.data())
}
fn record_input_state(&mut self) {
#[allow(unused)]
#[inline(always)]
fn record_input_state(&mut self) {}
#[allow(unused)]
fn record_input_state_2(&mut self) {
let st = self.buf.state();
self.input_state[self.input_state_ix] = InpState::Netbuf(st.0, st.1, self.buf.cap() - st.1);
self.input_state_ix = (1 + self.input_state_ix) % self.input_state.len();
}
fn record_conn_state(&mut self) {
#[allow(unused)]
#[inline(always)]
fn record_conn_state(&mut self) {}
#[allow(unused)]
fn record_conn_state_2(&mut self) {
self.conn_state_log[self.conn_state_log_ix] = self.conn_state.clone();
self.conn_state_log_ix = (1 + self.conn_state_log_ix) % self.conn_state_log.len();
}
+25
View File
@@ -4,26 +4,51 @@ pub struct EMA {
ema: f32,
emv: f32,
k: f32,
update_count: u64,
}
impl EMA {
pub fn with_k(k: f32) -> Self {
Self {
ema: 0.0,
emv: 0.0,
k,
update_count: 0,
}
}
pub fn default() -> Self {
Self {
ema: 0.0,
emv: 0.0,
k: 0.05,
update_count: 0,
}
}
#[inline(always)]
pub fn update<V>(&mut self, v: V)
where
V: Into<f32>,
{
self.update_count += 1;
let k = self.k;
let dv = v.into() - self.ema;
self.ema += k * dv;
self.emv = (1f32 - k) * (self.emv + k * dv * dv);
}
pub fn update_count(&self) -> u64 {
self.update_count
}
pub fn ema(&self) -> f32 {
self.ema
}
pub fn emv(&self) -> f32 {
self.emv
}
}
pub struct CheckEvery {