diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..946c525 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,11 @@ +[build] +rustflags = [ + "-C", "target-cpu=sandybridge", + "-C", "force-frame-pointers=yes", + "-C", "force-unwind-tables=yes", + #"-C", "relocation-model=static", + #"-C", "embed-bitcode=no", + #"-C", "inline-threshold=1000", + #"-Z", "time-passes=yes", + #"-Z", "time-llvm-passes=yes", +] diff --git a/Cargo.toml b/Cargo.toml index e782558..2c3e537 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,11 +2,11 @@ members = ["log", "netfetch", "daqingest"] [profile.release] -opt-level = 1 -debug = 0 +opt-level = 2 +debug = 1 overflow-checks = false debug-assertions = false -lto = false +lto = "thin" codegen-units = 32 incremental = true diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 5b91be9..9a08855 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -4,6 +4,7 @@ use err::Error; pub fn main() -> Result<(), Error> { taskrun::run(async { + log::info!("daqingest version {}", clap::crate_version!()); if false { return Err(Error::with_msg_no_trace(format!("unknown command"))); } else { diff --git a/daqingest/src/daqingest.rs b/daqingest/src/daqingest.rs index 2d24195..63e67c4 100644 --- a/daqingest/src/daqingest.rs +++ b/daqingest/src/daqingest.rs @@ -36,6 +36,10 @@ pub struct Bsread { pub array_truncate: Option, #[clap(long)] pub do_pulse_id: bool, + #[clap(long)] + pub skip_insert: bool, + #[clap(long)] + pub process_channel_count_limit: Option, } impl From for ZmtpClientOpts { @@ -46,6 +50,8 @@ impl From for ZmtpClientOpts { rcvbuf: k.rcvbuf, array_truncate: k.array_truncate, do_pulse_id: k.do_pulse_id, + process_channel_count_limit: k.process_channel_count_limit, + skip_insert: k.skip_insert, } } } diff --git a/netfetch/src/channelwriter.rs b/netfetch/src/channelwriter.rs index 62c9351..7b1ac62 100644 --- a/netfetch/src/channelwriter.rs +++ b/netfetch/src/channelwriter.rs @@ -24,13 +24,13 @@ pub struct ScyQueryFut { query: Box, #[allow(unused)] values: Box, - fut: Pin>>>, + fut: Pin> + Send>>, } impl ScyQueryFut { pub fn new(scy: Arc, query: PreparedStatement, values: V) -> Self where - V: ValueList + 'static, + V: ValueList + Sync + 'static, { let query = Box::new(query); let values = Box::new(values); @@ -139,7 +139,7 @@ pub struct ScyBatchFutGen { scy: Arc, #[allow(unused)] batch: Box, - fut: Pin>>>, + fut: Pin> + Send>>, polled: usize, ts_create: Instant, ts_poll_start: Instant, @@ -148,7 +148,7 @@ pub struct ScyBatchFutGen { impl ScyBatchFutGen { pub fn new(scy: Arc, batch: Batch, values: V) -> Self where - V: BatchValues + 'static, + V: BatchValues + Sync + Send + 'static, { let batch = Box::new(batch); let scy_ref = unsafe { &*(&scy as &_ as *const _) } as &ScySession; @@ -203,7 +203,7 @@ pub struct InsertLoopFut { scy: Arc, #[allow(unused)] query: Arc, - futs: Vec>>>>, + futs: Vec> + Send>>>, fut_ix: usize, polled: usize, ts_create: Instant, @@ -211,11 +211,14 @@ pub struct InsertLoopFut { } impl InsertLoopFut { - pub fn new(scy: Arc, query: PreparedStatement, values: Vec) -> Self + pub fn new(scy: Arc, query: PreparedStatement, values: Vec, skip_insert: bool) -> Self where - V: ValueList + 'static, + V: ValueList + Send + 'static, { - //values.clear(); + let mut values = values; + if skip_insert { + values.clear(); + } let query = Arc::new(query); let scy_ref = unsafe { &*(&scy as &_ as *const _) } as &ScySession; let query_ref = unsafe { &*(&query as &_ as *const _) } as &PreparedStatement; @@ -225,7 +228,6 @@ impl InsertLoopFut { let futs: Vec<_> = values .into_iter() .map(|vs| { - // let fut = scy_ref.execute(query_ref, vs); Box::pin(fut) as _ }) @@ -301,8 +303,8 @@ pub struct ChannelWriteRes { pub struct ChannelWriteFut { nn: usize, - fut1: Option>>>>, - fut2: Option>>>>, + fut1: Option> + Send>>>, + fut2: Option> + Send>>>, ts1: Option, mask: u8, } @@ -374,6 +376,12 @@ pub trait ChannelWriter { fn write_msg(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result; } +struct MsgAcceptorOptions { + cq: Arc, + skip_insert: bool, + array_truncate: usize, +} + trait MsgAcceptor { fn len(&self) -> usize; fn accept(&mut self, ts_msp: i64, ts_lsp: i64, pulse: i64, fr: &ZmtpFrame) -> Result<(), Error>; @@ -388,14 +396,16 @@ macro_rules! impl_msg_acceptor_scalar { query: PreparedStatement, values: Vec<(i64, i64, i64, i64, $st)>, series: i64, + opts: MsgAcceptorOptions, } impl $sname { - pub fn new(series: i64, cq: &CommonQueries) -> Self { + pub fn new(series: i64, opts: MsgAcceptorOptions) -> Self { Self { - query: cq.$qu_id.clone(), + query: opts.cq.$qu_id.clone(), values: vec![], series, + opts, } } } @@ -439,7 +449,7 @@ macro_rules! impl_msg_acceptor_scalar { fn flush_loop(&mut self, scy: Arc) -> Result { let vt = mem::replace(&mut self.values, vec![]); - let ret = InsertLoopFut::new(scy, self.query.clone(), vt); + let ret = InsertLoopFut::new(scy, self.query.clone(), vt, self.opts.skip_insert); Ok(ret) } } @@ -454,16 +464,18 @@ macro_rules! impl_msg_acceptor_array { series: i64, array_truncate: usize, truncated: usize, + opts: MsgAcceptorOptions, } impl $sname { - pub fn new(series: i64, array_truncate: usize, cq: &CommonQueries) -> Self { + pub fn new(series: i64, opts: MsgAcceptorOptions) -> Self { Self { - query: cq.$qu_id.clone(), + query: opts.cq.$qu_id.clone(), values: vec![], series, - array_truncate, + array_truncate: opts.array_truncate, truncated: 0, + opts, } } } @@ -478,10 +490,26 @@ macro_rules! impl_msg_acceptor_array { const STL: usize = std::mem::size_of::(); let vc = fr.data().len() / STL; let mut values = Vec::with_capacity(vc); - for i in 0..vc { - let h = i * STL; - let value = ST::$from_bytes(fr.data()[h..h + STL].try_into()?); - values.push(value); + if false { + for i in 0..vc { + let h = i * STL; + let value = ST::$from_bytes(fr.data()[h..h + STL].try_into()?); + values.push(value); + } + } else { + let mut ptr: *const u8 = fr.data().as_ptr(); + let mut ptr2: *mut ST = values.as_mut_ptr(); + for _ in 0..vc { + unsafe { + let a: &[u8; STL] = &*(ptr as *const [u8; STL]); + *ptr2 = ST::$from_bytes(*a); + } + ptr = ptr.wrapping_offset(STL as isize); + ptr2 = ptr2.wrapping_offset(1); + } + unsafe { + values.set_len(vc); + } } if values.len() > self.array_truncate { if self.truncated < 10 { @@ -516,7 +544,7 @@ macro_rules! impl_msg_acceptor_array { fn flush_loop(&mut self, scy: Arc) -> Result { let vt = mem::replace(&mut self.values, vec![]); - let ret = InsertLoopFut::new(scy, self.query.clone(), vt); + let ret = InsertLoopFut::new(scy, self.query.clone(), vt, self.opts.skip_insert); Ok(ret) } } @@ -553,16 +581,18 @@ struct MsgAcceptorArrayBool { series: i64, array_truncate: usize, truncated: usize, + opts: MsgAcceptorOptions, } impl MsgAcceptorArrayBool { - pub fn new(series: i64, array_truncate: usize, cq: &CommonQueries) -> Self { + pub fn new(series: i64, opts: MsgAcceptorOptions) -> Self { Self { - query: cq.qu_insert_array_bool.clone(), + query: opts.cq.qu_insert_array_bool.clone(), values: vec![], series, - array_truncate, + array_truncate: opts.array_truncate, truncated: 0, + opts, } } } @@ -616,7 +646,7 @@ impl MsgAcceptor for MsgAcceptorArrayBool { fn flush_loop(&mut self, scy: Arc) -> Result { let vt = mem::replace(&mut self.values, vec![]); - let ret = InsertLoopFut::new(scy, self.query.clone(), vt); + let ret = InsertLoopFut::new(scy, self.query.clone(), vt, self.opts.skip_insert); Ok(ret) } } @@ -627,12 +657,14 @@ pub struct ChannelWriterAll { common_queries: Arc, ts_msp_lsp: fn(u64, u64) -> (u64, u64), ts_msp_last: u64, - acceptor: Box, + acceptor: Box, #[allow(unused)] scalar_type: ScalarType, #[allow(unused)] shape: Shape, pulse_last: u64, + #[allow(unused)] + skip_insert: bool, } impl ChannelWriterAll { @@ -644,66 +676,72 @@ impl ChannelWriterAll { shape: Shape, byte_order: ByteOrder, array_truncate: usize, + skip_insert: bool, ) -> Result { - let (ts_msp_lsp, acc): (fn(u64, u64) -> (u64, u64), Box) = match &shape { + let opts = MsgAcceptorOptions { + cq: common_queries.clone(), + skip_insert, + array_truncate, + }; + let (ts_msp_lsp, acc): (fn(u64, u64) -> (u64, u64), Box) = match &shape { Shape::Scalar => match &scalar_type { ScalarType::U16 => match &byte_order { ByteOrder::LE => { - let acc = MsgAcceptorScalarU16LE::new(series as i64, &common_queries); + let acc = MsgAcceptorScalarU16LE::new(series as i64, opts); (ts_msp_lsp_1, Box::new(acc) as _) } ByteOrder::BE => { - let acc = MsgAcceptorScalarU16BE::new(series as i64, &common_queries); + let acc = MsgAcceptorScalarU16BE::new(series as i64, opts); (ts_msp_lsp_1, Box::new(acc) as _) } }, ScalarType::U32 => match &byte_order { ByteOrder::LE => { - let acc = MsgAcceptorScalarU32LE::new(series as i64, &common_queries); + let acc = MsgAcceptorScalarU32LE::new(series as i64, opts); (ts_msp_lsp_1, Box::new(acc) as _) } ByteOrder::BE => { - let acc = MsgAcceptorScalarU32BE::new(series as i64, &common_queries); + let acc = MsgAcceptorScalarU32BE::new(series as i64, opts); (ts_msp_lsp_1, Box::new(acc) as _) } }, ScalarType::I16 => match &byte_order { ByteOrder::LE => { - let acc = MsgAcceptorScalarI16LE::new(series as i64, &common_queries); + let acc = MsgAcceptorScalarI16LE::new(series as i64, opts); (ts_msp_lsp_1, Box::new(acc) as _) } ByteOrder::BE => { - let acc = MsgAcceptorScalarI16BE::new(series as i64, &common_queries); + let acc = MsgAcceptorScalarI16BE::new(series as i64, opts); (ts_msp_lsp_1, Box::new(acc) as _) } }, ScalarType::I32 => match &byte_order { ByteOrder::LE => { - let acc = MsgAcceptorScalarI32LE::new(series as i64, &common_queries); + let acc = MsgAcceptorScalarI32LE::new(series as i64, opts); (ts_msp_lsp_1, Box::new(acc) as _) } ByteOrder::BE => { - let acc = MsgAcceptorScalarI32BE::new(series as i64, &common_queries); + let acc = MsgAcceptorScalarI32BE::new(series as i64, opts); (ts_msp_lsp_1, Box::new(acc) as _) } }, ScalarType::F32 => match &byte_order { ByteOrder::LE => { - let acc = MsgAcceptorScalarF32LE::new(series as i64, &common_queries); + let acc = MsgAcceptorScalarF32LE::new(series as i64, opts); (ts_msp_lsp_1, Box::new(acc) as _) } ByteOrder::BE => { - let acc = MsgAcceptorScalarF32BE::new(series as i64, &common_queries); + let acc = MsgAcceptorScalarF32BE::new(series as i64, opts); (ts_msp_lsp_1, Box::new(acc) as _) } }, ScalarType::F64 => match &byte_order { ByteOrder::LE => { - let acc = MsgAcceptorScalarF64LE::new(series as i64, &common_queries); + let acc = MsgAcceptorScalarF64LE::new(series as i64, opts); (ts_msp_lsp_1, Box::new(acc) as _) } ByteOrder::BE => { - let acc = MsgAcceptorScalarF64BE::new(series as i64, &common_queries); + let acc = MsgAcceptorScalarF64BE::new(series as i64, opts); (ts_msp_lsp_1, Box::new(acc) as _) } }, @@ -719,57 +757,57 @@ impl ChannelWriterAll { match &scalar_type { ScalarType::BOOL => match &byte_order { _ => { - let acc = MsgAcceptorArrayBool::new(series as i64, array_truncate, &common_queries); + let acc = MsgAcceptorArrayBool::new(series as i64, opts); (ts_msp_lsp_2, Box::new(acc) as _) } }, ScalarType::U16 => match &byte_order { ByteOrder::LE => { - let acc = MsgAcceptorArrayU16LE::new(series as i64, array_truncate, &common_queries); + let acc = MsgAcceptorArrayU16LE::new(series as i64, opts); (ts_msp_lsp_2, Box::new(acc) as _) } ByteOrder::BE => { - let acc = MsgAcceptorArrayU16BE::new(series as i64, array_truncate, &common_queries); + let acc = MsgAcceptorArrayU16BE::new(series as i64, opts); (ts_msp_lsp_2, Box::new(acc) as _) } }, ScalarType::I16 => match &byte_order { ByteOrder::LE => { - let acc = MsgAcceptorArrayI16LE::new(series as i64, array_truncate, &common_queries); + let acc = MsgAcceptorArrayI16LE::new(series as i64, opts); (ts_msp_lsp_2, Box::new(acc) as _) } ByteOrder::BE => { - let acc = MsgAcceptorArrayI16BE::new(series as i64, array_truncate, &common_queries); + let acc = MsgAcceptorArrayI16BE::new(series as i64, opts); (ts_msp_lsp_2, Box::new(acc) as _) } }, ScalarType::I32 => match &byte_order { ByteOrder::LE => { - let acc = MsgAcceptorArrayI32LE::new(series as i64, array_truncate, &common_queries); + let acc = MsgAcceptorArrayI32LE::new(series as i64, opts); (ts_msp_lsp_2, Box::new(acc) as _) } ByteOrder::BE => { - let acc = MsgAcceptorArrayI32BE::new(series as i64, array_truncate, &common_queries); + let acc = MsgAcceptorArrayI32BE::new(series as i64, opts); (ts_msp_lsp_2, Box::new(acc) as _) } }, ScalarType::F32 => match &byte_order { ByteOrder::LE => { - let acc = MsgAcceptorArrayF32LE::new(series as i64, array_truncate, &common_queries); + let acc = MsgAcceptorArrayF32LE::new(series as i64, opts); (ts_msp_lsp_2, Box::new(acc) as _) } ByteOrder::BE => { - let acc = MsgAcceptorArrayF32BE::new(series as i64, array_truncate, &common_queries); + let acc = MsgAcceptorArrayF32BE::new(series as i64, opts); (ts_msp_lsp_2, Box::new(acc) as _) } }, ScalarType::F64 => match &byte_order { ByteOrder::LE => { - let acc = MsgAcceptorArrayF64LE::new(series as i64, array_truncate, &common_queries); + let acc = MsgAcceptorArrayF64LE::new(series as i64, opts); (ts_msp_lsp_2, Box::new(acc) as _) } ByteOrder::BE => { - let acc = MsgAcceptorArrayF64BE::new(series as i64, array_truncate, &common_queries); + let acc = MsgAcceptorArrayF64BE::new(series as i64, opts); (ts_msp_lsp_2, Box::new(acc) as _) } }, @@ -798,6 +836,7 @@ impl ChannelWriterAll { scalar_type, shape, pulse_last: 0, + skip_insert, }; Ok(ret) } @@ -814,14 +853,18 @@ impl ChannelWriterAll { let fut1 = if ts_msp != self.ts_msp_last { debug!("ts_msp changed ts {ts} pulse {pulse} ts_msp {ts_msp} ts_lsp {ts_lsp}"); self.ts_msp_last = ts_msp; - // TODO make the passing of the query parameters type safe. - // TODO the "dtype" table field is not needed here. Drop from database. - let fut = ScyQueryFut::new( - self.scy.clone(), - self.common_queries.qu_insert_ts_msp.clone(), - (self.series as i64, ts_msp as i64), - ); - Some(Box::pin(fut) as _) + if !self.skip_insert { + // TODO make the passing of the query parameters type safe. + // TODO the "dtype" table field is not needed here. Drop from database. + let fut = ScyQueryFut::new( + self.scy.clone(), + self.common_queries.qu_insert_ts_msp.clone(), + (self.series as i64, ts_msp as i64), + ); + Some(Box::pin(fut) as _) + } else { + None + } } else { None }; diff --git a/netfetch/src/netbuf.rs b/netfetch/src/netbuf.rs index b7bd80c..2dfd3a3 100644 --- a/netfetch/src/netbuf.rs +++ b/netfetch/src/netbuf.rs @@ -7,6 +7,12 @@ pub struct NetBuf { rp: usize, } +macro_rules! check_invariants { + ($self:expr) => { + //$self.check_invariants() + }; +} + impl NetBuf { pub fn new(cap: usize) -> Self { Self { @@ -21,27 +27,27 @@ impl NetBuf { } pub fn len(&self) -> usize { - self.check_invariant(); + check_invariants!(self); self.wp - self.rp } pub fn cap(&self) -> usize { - self.check_invariant(); + check_invariants!(self); self.buf.len() } pub fn wcap(&self) -> usize { - self.check_invariant(); + check_invariants!(self); self.buf.len() - self.wp } pub fn data(&self) -> &[u8] { - self.check_invariant(); + check_invariants!(self); &self.buf[self.rp..self.wp] } pub fn adv(&mut self, x: usize) -> Result<(), Error> { - self.check_invariant(); + check_invariants!(self); if self.len() < x { return Err(Error::with_msg_no_trace("not enough bytes")); } else { @@ -51,7 +57,7 @@ impl NetBuf { } pub fn wadv(&mut self, x: usize) -> Result<(), Error> { - self.check_invariant(); + check_invariants!(self); if self.wcap() < x { return Err(Error::with_msg_no_trace("not enough space")); } else { @@ -61,7 +67,7 @@ impl NetBuf { } pub fn read_u8(&mut self) -> Result { - self.check_invariant(); + check_invariants!(self); type T = u8; const TS: usize = std::mem::size_of::(); if self.len() < TS { @@ -74,7 +80,7 @@ impl NetBuf { } pub fn read_u64(&mut self) -> Result { - self.check_invariant(); + check_invariants!(self); type T = u64; const TS: usize = std::mem::size_of::(); if self.len() < TS { @@ -87,7 +93,7 @@ impl NetBuf { } pub fn read_bytes(&mut self, n: usize) -> Result<&[u8], Error> { - self.check_invariant(); + check_invariants!(self); if self.len() < n { return Err(Error::with_msg_no_trace("not enough bytes")); } else { @@ -98,14 +104,14 @@ impl NetBuf { } pub fn read_buf_for_fill(&mut self) -> ReadBuf { - self.check_invariant(); + check_invariants!(self); self.rewind_if_needed(); let read_buf = ReadBuf::new(&mut self.buf[self.wp..]); read_buf } pub fn rewind_if_needed(&mut self) { - self.check_invariant(); + check_invariants!(self); if self.rp != 0 && self.rp == self.wp { self.rp = 0; self.wp = 0; @@ -117,7 +123,7 @@ impl NetBuf { } pub fn put_slice(&mut self, buf: &[u8]) -> Result<(), Error> { - self.check_invariant(); + check_invariants!(self); self.rewind_if_needed(); if self.wcap() < buf.len() { return Err(Error::with_msg_no_trace("not enough space")); @@ -129,7 +135,7 @@ impl NetBuf { } pub fn put_u8(&mut self, v: u8) -> Result<(), Error> { - self.check_invariant(); + check_invariants!(self); type T = u8; const TS: usize = std::mem::size_of::(); self.rewind_if_needed(); @@ -143,7 +149,7 @@ impl NetBuf { } pub fn put_u64(&mut self, v: u64) -> Result<(), Error> { - self.check_invariant(); + check_invariants!(self); type T = u64; const TS: usize = std::mem::size_of::(); self.rewind_if_needed(); @@ -157,12 +163,7 @@ impl NetBuf { } #[allow(unused)] - #[inline(always)] - fn check_invariant(&self) {} - - #[allow(unused)] - #[inline(always)] - fn check_invariant2(&self) { + fn check_invariants(&self) { if self.wp > self.buf.len() { eprintln!("ERROR netbuf wp {} rp {}", self.wp, self.rp); std::process::exit(87); diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index 4920699..51da923 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -109,12 +109,14 @@ pub struct ZmtpClientOpts { pub do_pulse_id: bool, pub rcvbuf: Option, pub array_truncate: Option, + pub process_channel_count_limit: Option, + pub skip_insert: bool, } struct ClientRun { #[allow(unused)] client: Pin>, - fut: Pin>>>, + fut: Pin> + Send>>, } impl ClientRun { @@ -142,7 +144,7 @@ struct BsreadClient { rcvbuf: Option, tmp_vals_pulse_map: Vec<(i64, i32, i64, i32)>, scy: Arc, - channel_writers: BTreeMap>, + channel_writers: BTreeMap>, common_queries: Arc, print_stats: CheckEvery, parser: Parser, @@ -186,12 +188,20 @@ impl BsreadClient { let mut rows_inserted = 0u32; let mut time_spent_inserting = Duration::from_millis(0); let mut series_ids = Vec::new(); + let mut msg_dt_ema = stats::EMA::with_k(0.01); + let mut msg_ts_last = Instant::now(); while let Some(item) = zmtp.next().await { + let tsnow = Instant::now(); match item { Ok(ev) => match ev { ZmtpEvent::ZmtpCommand(_) => (), ZmtpEvent::ZmtpMessage(msg) => { msgc += 1; + { + let dt = tsnow.duration_since(msg_ts_last); + msg_dt_ema.update(dt.as_secs_f32()); + msg_ts_last = tsnow; + } match self.parser.parse_zmtp_message(&msg) { Ok(bm) => { if msg.frames().len() - 2 * bm.head_b.channels.len() != 2 { @@ -268,7 +278,11 @@ impl BsreadClient { // TODO limit log rate warn!("Bad pulse {} for {}", pulse, self.source_addr); } - for i1 in 0..head_b.channels.len() { + for i1 in 0..head_b + .channels + .len() + .min(self.opts.process_channel_count_limit.unwrap_or(4000)) + { let chn = &head_b.channels[i1]; let fr = &msg.frames[2 + 2 * i1]; if i1 >= series_ids.len() { @@ -323,6 +337,13 @@ impl BsreadClient { rows_inserted = 0; time_spent_inserting = Duration::from_millis(0); bytes_payload = 0; + if msg_dt_ema.update_count() > 100 { + let ema = msg_dt_ema.ema(); + if ema < 0.005 { + let emv = msg_dt_ema.emv().sqrt(); + warn!("MSG FREQ {} {:9.5} {:9.5}", self.source_addr, ema, emv); + } + } } } Ok(()) @@ -350,17 +371,20 @@ impl BsreadClient { shape.clone(), byte_order.clone(), trunc, + self.opts.skip_insert, )?; let shape_dims = shape.to_scylla_vec(); self.channel_writers.insert(series, Box::new(cw)); - // TODO insert correct facility name - self.scy + if !self.opts.skip_insert { + // TODO insert correct facility name + self.scy .query( "insert into series_by_channel (facility, channel_name, series, scalar_type, shape_dims) values (?, ?, ?, ?, ?)", ("scylla", &chn.name, series as i64, scalar_type.to_scylla_i32(), &shape_dims), ) .await .err_conv()?; + } Ok(()) } @@ -498,13 +522,15 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> { qu_insert_array_bool, }; let common_queries = Arc::new(common_queries); - let mut clients = vec![]; + let mut jhs = vec![]; for source_addr in &opts.sources { let client = BsreadClient::new(opts.clone(), source_addr.into(), scy.clone(), common_queries.clone()).await?; let fut = ClientRun::new(client); - clients.push(fut); + //clients.push(fut); + let jh = tokio::spawn(fut); + jhs.push(jh); } - futures_util::future::join_all(clients).await; + futures_util::future::join_all(jhs).await; Ok(()) } @@ -543,10 +569,13 @@ fn set_rcv_sock_opts(conn: &mut TcpStream, rcvbuf: u32) -> Result<(), Error> { if ec != 0 { let errno = *libc::__errno_location(); let es = CStr::from_ptr(libc::strerror(errno)); - warn!("can not query socket option ec {ec} errno {errno} es {es:?}"); - error!("can not query socket option"); + error!("can not query socket option ec {ec} errno {errno} es {es:?}"); } else { - info!("SO_RCVBUF {n}"); + if (n as u32) < rcvbuf * 5 / 6 { + warn!("SO_RCVBUF {n} smaller than requested {rcvbuf}"); + } else { + info!("SO_RCVBUF {n}"); + } } } Ok(()) @@ -792,13 +821,23 @@ impl Zmtp { (&mut self.conn, self.outbuf.data()) } - fn record_input_state(&mut self) { + #[allow(unused)] + #[inline(always)] + fn record_input_state(&mut self) {} + + #[allow(unused)] + fn record_input_state_2(&mut self) { let st = self.buf.state(); self.input_state[self.input_state_ix] = InpState::Netbuf(st.0, st.1, self.buf.cap() - st.1); self.input_state_ix = (1 + self.input_state_ix) % self.input_state.len(); } - fn record_conn_state(&mut self) { + #[allow(unused)] + #[inline(always)] + fn record_conn_state(&mut self) {} + + #[allow(unused)] + fn record_conn_state_2(&mut self) { self.conn_state_log[self.conn_state_log_ix] = self.conn_state.clone(); self.conn_state_log_ix = (1 + self.conn_state_log_ix) % self.conn_state_log.len(); } diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 1d68428..5ce7e6d 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -4,26 +4,51 @@ pub struct EMA { ema: f32, emv: f32, k: f32, + update_count: u64, } impl EMA { + pub fn with_k(k: f32) -> Self { + Self { + ema: 0.0, + emv: 0.0, + k, + update_count: 0, + } + } + pub fn default() -> Self { Self { ema: 0.0, emv: 0.0, k: 0.05, + update_count: 0, } } + #[inline(always)] pub fn update(&mut self, v: V) where V: Into, { + self.update_count += 1; let k = self.k; let dv = v.into() - self.ema; self.ema += k * dv; self.emv = (1f32 - k) * (self.emv + k * dv * dv); } + + pub fn update_count(&self) -> u64 { + self.update_count + } + + pub fn ema(&self) -> f32 { + self.ema + } + + pub fn emv(&self) -> f32 { + self.emv + } } pub struct CheckEvery {