From 0905ee6dfc3ec06a6adcadc2899735762b940d27 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 15 Sep 2022 15:35:20 +0200 Subject: [PATCH] Refactor --- netfetch/Cargo.toml | 2 + netfetch/src/ca.rs | 88 ++------------ netfetch/src/ca/conn.rs | 6 + netfetch/src/ca/proto.rs | 6 +- netfetch/src/channelwriter.rs | 214 ++++++++++++---------------------- netfetch/src/linuxhelper.rs | 121 +++++++++++++++++++ netfetch/src/metrics.rs | 4 +- netfetch/src/netfetch.rs | 1 + netfetch/src/store.rs | 21 +--- netfetch/src/zmtp.rs | 60 ++-------- 10 files changed, 229 insertions(+), 294 deletions(-) create mode 100644 netfetch/src/linuxhelper.rs diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index ffacb0d..72a72b3 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -38,3 +38,5 @@ err = { path = "../../daqbuffer/err" } netpod = { path = "../../daqbuffer/netpod" } taskrun = { path = "../../daqbuffer/taskrun" } bitshuffle = { path = "../../daqbuffer/bitshuffle" } +pin-project = "1" +lazy_static = "1" diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index ffc8418..cf1a659 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -7,6 +7,7 @@ pub mod store; use self::store::DataStore; use crate::ca::conn::ConnCommand; use crate::errconv::ErrConv; +use crate::linuxhelper::local_hostname; use crate::store::{CommonInsertItemQueue, QueryItem}; use async_channel::Sender; use conn::CaConn; @@ -19,12 +20,10 @@ use netpod::{Database, ScyllaConfig}; use serde::{Deserialize, Serialize}; use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff}; use std::collections::{BTreeMap, VecDeque}; -use std::ffi::CStr; -use std::mem::MaybeUninit; use std::net::{IpAddr, Ipv4Addr, SocketAddrV4}; use std::path::PathBuf; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; -use std::sync::{Arc, Mutex, Once}; +use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::fs::OpenOptions; use tokio::io::AsyncReadExt; @@ -32,17 +31,10 @@ use tokio::sync::Mutex as TokMx; use tokio::task::JoinHandle; use tokio_postgres::Client as PgClient; -static mut METRICS: Option>> = None; -static METRICS_ONCE: Once = Once::new(); +pub static SIGINT: AtomicU32 = AtomicU32::new(0); -pub fn get_metrics() -> &'static mut Option { - METRICS_ONCE.call_once(|| unsafe { - METRICS = Some(Mutex::new(None)); - }); - let mut g = unsafe { METRICS.as_mut().unwrap().lock().unwrap() }; - let ret: &mut Option = &mut *g; - let ret = unsafe { &mut *(ret as *mut _) }; - ret +lazy_static::lazy_static! { + pub static ref METRICS: Mutex> = Mutex::new(None); } #[derive(Debug, Serialize, Deserialize)] @@ -107,24 +99,6 @@ pub struct ListenFromFileOpts { pub config: PathBuf, } -fn local_hostname() -> String { - let mut buf = vec![0u8; 128]; - let hostname = unsafe { - let ec = libc::gethostname(buf.as_mut_ptr() as _, buf.len() - 2); - if ec != 0 { - panic!(); - } - let hostname = CStr::from_ptr(&buf[0] as *const _ as _); - hostname.to_str().unwrap() - }; - hostname.into() -} - -#[test] -fn test_get_local_hostname() { - assert_ne!(local_hostname().len(), 0); -} - pub async fn parse_config(config: PathBuf) -> Result { let mut file = OpenOptions::new().read(true).open(config).await?; let mut buf = vec![]; @@ -457,56 +431,8 @@ pub async fn create_ca_conn( Ok(jh) } -pub static SIGINT: AtomicU32 = AtomicU32::new(0); - -fn handler_sigaction(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) { - SIGINT.store(1, Ordering::Release); - let _ = unset_signal_handler(); -} - -fn set_signal_handler() -> Result<(), Error> { - let mask: libc::sigset_t = unsafe { MaybeUninit::zeroed().assume_init() }; - let handler: libc::sighandler_t = handler_sigaction as *const libc::c_void as _; - let act = libc::sigaction { - sa_sigaction: handler, - sa_mask: mask, - sa_flags: 0, - sa_restorer: None, - }; - unsafe { - let ec = libc::sigaction(libc::SIGINT, &act, std::ptr::null_mut()); - if ec != 0 { - let errno = *libc::__errno_location(); - let msg = CStr::from_ptr(libc::strerror(errno)); - error!("error: {:?}", msg); - return Err(Error::with_msg_no_trace(format!("can not set signal handler"))); - } - } - Ok(()) -} - -fn unset_signal_handler() -> Result<(), Error> { - let mask: libc::sigset_t = unsafe { MaybeUninit::zeroed().assume_init() }; - let act = libc::sigaction { - sa_sigaction: libc::SIG_DFL, - sa_mask: mask, - sa_flags: 0, - sa_restorer: None, - }; - unsafe { - let ec = libc::sigaction(libc::SIGINT, &act, std::ptr::null_mut()); - if ec != 0 { - let errno = *libc::__errno_location(); - let msg = CStr::from_ptr(libc::strerror(errno)); - error!("error: {:?}", msg); - return Err(Error::with_msg_no_trace(format!("can not set signal handler"))); - } - } - Ok(()) -} - pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { - set_signal_handler()?; + crate::linuxhelper::set_signal_handler()?; let insert_frac = Arc::new(AtomicU64::new(1000)); let insert_ivl_min = Arc::new(AtomicU64::new(8800)); let opts = parse_config(opts.config).await?; @@ -609,7 +535,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { for g in conn_stats.lock().await.iter() { agg.push(&g); } - let m = get_metrics(); + let mut m = METRICS.lock().unwrap(); *m = Some(agg.clone()); if false { let diff = CaConnStatsAggDiff::diff_from(&agg_last, &agg); diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 4c65295..54b98d5 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -39,6 +39,7 @@ pub enum ChannelConnectedInfo { pub struct ChannelStateInfo { pub name: String, pub addr: SocketAddrV4, + pub series: Option, pub channel_connected_info: ChannelConnectedInfo, pub scalar_type: Option, pub shape: Option, @@ -169,10 +170,15 @@ impl ChannelState { } _ => None, }; + let series = match self { + ChannelState::Created(s) => s.series.clone(), + _ => None, + }; let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10); ChannelStateInfo { name, addr, + series, channel_connected_info, scalar_type, shape, diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 06158e3..474885a 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -385,7 +385,7 @@ impl CaMsgTy { error!("bad buffer given for search payload {} vs {}", buf.len(), d.len()); panic!(); } - unsafe { std::ptr::copy(&d[0] as _, &mut buf[0] as _, d.len()) }; + buf[0..d.len()].copy_from_slice(&d[0..d.len()]); } SearchRes(_) => { error!("should not attempt to write SearchRes"); @@ -400,7 +400,7 @@ impl CaMsgTy { error!("bad buffer given for create chan payload {} vs {}", buf.len(), d.len()); panic!(); } - unsafe { std::ptr::copy(&d[0] as _, &mut buf[0] as _, d.len()) }; + buf[0..d.len()].copy_from_slice(&d[0..d.len()]); } CreateChanRes(_) => {} CreateChanFail(_) => {} @@ -438,7 +438,7 @@ macro_rules! convert_wave_value { const STL: usize = std::mem::size_of::(); let nn = $n.min($buf.len() / STL); let mut a = Vec::with_capacity(nn); - // TODO optimize with unsafe? + // TODO should optimize? let mut bb = &$buf[..]; for _ in 0..nn { let v = ST::from_be_bytes(bb[..STL].try_into()?); diff --git a/netfetch/src/channelwriter.rs b/netfetch/src/channelwriter.rs index 9e021e9..4a23658 100644 --- a/netfetch/src/channelwriter.rs +++ b/netfetch/src/channelwriter.rs @@ -17,37 +17,21 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; -pub struct ScyQueryFut { - #[allow(unused)] - scy: Arc, - #[allow(unused)] - query: Box, - #[allow(unused)] - values: Box, - fut: Pin> + Send>>, +pub struct ScyQueryFut<'a> { + fut: Pin> + Send + 'a>>, } -impl ScyQueryFut { - pub fn new(scy: Arc, query: PreparedStatement, values: V) -> Self +impl<'a> ScyQueryFut<'a> { + pub fn new(scy: &'a ScySession, query: &'a PreparedStatement, values: V) -> Self where - V: ValueList + Sync + 'static, + V: ValueList + Send + 'static, { - let query = Box::new(query); - let values = Box::new(values); - let scy2 = unsafe { &*(&scy as &_ as *const _) } as &ScySession; - let query2 = unsafe { &*(&query as &_ as *const _) } as &PreparedStatement; - let v2 = unsafe { &*(&values as &_ as *const _) } as &V; - let fut = scy2.execute(query2, v2); - Self { - scy, - query, - values, - fut: Box::pin(fut), - } + let fut = scy.execute(query, values); + Self { fut: Box::pin(fut) } } } -impl Future for ScyQueryFut { +impl<'a> Future for ScyQueryFut<'a> { type Output = Result<(), Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { @@ -55,45 +39,28 @@ impl Future for ScyQueryFut { match self.fut.poll_unpin(cx) { Ready(k) => match k { Ok(_) => Ready(Ok(())), - Err(e) => { - warn!("ScyQueryFut done Err"); - Ready(Err(e).err_conv()) - } + Err(e) => Ready(Err(e).err_conv()), }, Pending => Pending, } } } -pub struct ScyBatchFut { - #[allow(unused)] - scy: Arc, - #[allow(unused)] - batch: Box, - #[allow(unused)] - values: Box, - fut: Pin>>>, +pub struct ScyBatchFut<'a> { + fut: Pin> + 'a>>, polled: usize, ts_create: Instant, ts_poll_start: Instant, } -impl ScyBatchFut { - pub fn new(scy: Arc, batch: Batch, values: V) -> Self +impl<'a> ScyBatchFut<'a> { + pub fn new(scy: &'a ScySession, batch: &'a Batch, values: V) -> Self where - V: BatchValues + 'static, + V: BatchValues + Send + Sync + 'static, { - let batch = Box::new(batch); - let values = Box::new(values); - let scy2 = unsafe { &*(&scy as &_ as *const _) } as &ScySession; - let batch2 = unsafe { &*(&batch as &_ as *const _) } as &Batch; - let v2 = unsafe { &*(&values as &_ as *const _) } as &V; - let fut = scy2.batch(batch2, v2); + let fut = scy.batch(batch, values); let tsnow = Instant::now(); Self { - scy, - batch, - values, fut: Box::pin(fut), polled: 0, ts_create: tsnow, @@ -102,7 +69,7 @@ impl ScyBatchFut { } } -impl Future for ScyBatchFut { +impl<'a> Future for ScyBatchFut<'a> { type Output = Result<(), Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { @@ -134,30 +101,21 @@ impl Future for ScyBatchFut { } } -pub struct ScyBatchFutGen { - #[allow(unused)] - scy: Arc, - #[allow(unused)] - batch: Box, - fut: Pin> + Send>>, +pub struct ScyBatchFutGen<'a> { + fut: Pin> + Send + 'a>>, polled: usize, ts_create: Instant, ts_poll_start: Instant, } -impl ScyBatchFutGen { - pub fn new(scy: Arc, batch: Batch, values: V) -> Self +impl<'a> ScyBatchFutGen<'a> { + pub fn new(scy: &'a ScySession, batch: &'a Batch, values: V) -> Self where - V: BatchValues + Sync + Send + 'static, + V: BatchValues + Send + Sync + 'static, { - let batch = Box::new(batch); - let scy_ref = unsafe { &*(&scy as &_ as *const _) } as &ScySession; - let batch_ref = unsafe { &*(&batch as &_ as *const _) } as &Batch; - let fut = scy_ref.batch(batch_ref, values); + let fut = scy.batch(batch, values); let tsnow = Instant::now(); Self { - scy, - batch, fut: Box::pin(fut), polled: 0, ts_create: tsnow, @@ -166,7 +124,7 @@ impl ScyBatchFutGen { } } -impl Future for ScyBatchFutGen { +impl<'a> Future for ScyBatchFutGen<'a> { type Output = Result<(), Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { @@ -198,44 +156,35 @@ impl Future for ScyBatchFutGen { } } -pub struct InsertLoopFut { - #[allow(unused)] - scy: Arc, - #[allow(unused)] - query: Arc, - futs: Vec> + Send>>>, +pub struct InsertLoopFut<'a> { + futs: Vec> + Send + 'a>>>, fut_ix: usize, polled: usize, ts_create: Instant, ts_poll_start: Instant, } -impl InsertLoopFut { - pub fn new(scy: Arc, query: PreparedStatement, values: Vec, skip_insert: bool) -> Self +impl<'a> InsertLoopFut<'a> { + pub fn new(scy: &'a ScySession, query: &'a PreparedStatement, values: Vec, skip_insert: bool) -> Self where - V: ValueList + Send + 'static, + V: ValueList + Send + Sync + 'static, { let mut values = values; if skip_insert { values.clear(); } - let query = Arc::new(query); - let scy_ref = unsafe { &*(&scy as &_ as *const _) } as &ScySession; - let query_ref = unsafe { &*(&query as &_ as *const _) } as &PreparedStatement; // TODO // Can I store the values in some better generic form? // Or is it acceptable to generate all insert futures right here and poll them later? let futs: Vec<_> = values .into_iter() .map(|vs| { - let fut = scy_ref.execute(query_ref, vs); + let fut = scy.execute(query, vs); Box::pin(fut) as _ }) .collect(); let tsnow = Instant::now(); Self { - scy, - query, futs, fut_ix: 0, polled: 0, @@ -245,7 +194,7 @@ impl InsertLoopFut { } } -impl Future for InsertLoopFut { +impl<'a> Future for InsertLoopFut<'a> { type Output = Result<(), Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { @@ -301,15 +250,15 @@ pub struct ChannelWriteRes { pub dt: Duration, } -pub struct ChannelWriteFut { +pub struct ChannelWriteFut<'a> { nn: usize, - fut1: Option> + Send>>>, - fut2: Option> + Send>>>, + fut1: Option> + Send + 'a>>>, + fut2: Option> + Send + 'a>>>, ts1: Option, mask: u8, } -impl Future for ChannelWriteFut { +impl<'a> Future for ChannelWriteFut<'a> { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { @@ -386,8 +335,8 @@ trait MsgAcceptor { fn len(&self) -> usize; fn accept(&mut self, ts_msp: i64, ts_lsp: i64, pulse: i64, fr: &ZmtpFrame) -> Result<(), Error>; fn should_flush(&self) -> bool; - fn flush_loop(&mut self, scy: Arc) -> Result; - fn flush_batch(&mut self, scy: Arc) -> Result; + fn flush_loop<'a>(&'a mut self, scy: &'a ScySession) -> Result, Error>; + fn flush_batch<'a>(&'a mut self, scy: &'a ScySession) -> Result, Error>; } macro_rules! impl_msg_acceptor_scalar { @@ -397,6 +346,7 @@ macro_rules! impl_msg_acceptor_scalar { values: Vec<(i64, i64, i64, i64, $st)>, series: i64, opts: MsgAcceptorOptions, + batch: Batch, } impl $sname { @@ -406,6 +356,7 @@ macro_rules! impl_msg_acceptor_scalar { values: vec![], series, opts, + batch: Batch::new((BatchType::Unlogged)), } } } @@ -436,20 +387,21 @@ macro_rules! impl_msg_acceptor_scalar { self.len() >= 140 + ((self.series as usize) & 0x1f) } - fn flush_batch(&mut self, scy: Arc) -> Result { + fn flush_batch<'a>(&'a mut self, scy: &'a ScySession) -> Result, Error> { let vt = mem::replace(&mut self.values, vec![]); let nn = vt.len(); - let mut batch = Batch::new(BatchType::Unlogged); + self.batch = Batch::new(BatchType::Unlogged); + let batch = &mut self.batch; for _ in 0..nn { batch.append_statement(self.query.clone()); } - let ret = ScyBatchFutGen::new(scy, batch, vt); + let ret = ScyBatchFutGen::new(&scy, batch, vt); Ok(ret) } - fn flush_loop(&mut self, scy: Arc) -> Result { + fn flush_loop<'a>(&'a mut self, scy: &'a ScySession) -> Result, Error> { let vt = mem::replace(&mut self.values, vec![]); - let ret = InsertLoopFut::new(scy, self.query.clone(), vt, self.opts.skip_insert); + let ret = InsertLoopFut::new(scy, &self.query, vt, self.opts.skip_insert); Ok(ret) } } @@ -465,6 +417,7 @@ macro_rules! impl_msg_acceptor_array { array_truncate: usize, truncated: usize, opts: MsgAcceptorOptions, + batch: Batch, } impl $sname { @@ -476,6 +429,7 @@ macro_rules! impl_msg_acceptor_array { array_truncate: opts.array_truncate, truncated: 0, opts, + batch: Batch::new(BatchType::Unlogged), } } } @@ -488,41 +442,17 @@ macro_rules! impl_msg_acceptor_array { fn accept(&mut self, ts_msp: i64, ts_lsp: i64, pulse: i64, fr: &ZmtpFrame) -> Result<(), Error> { type ST = $st; const STL: usize = std::mem::size_of::(); - let vc = fr.data().len() / STL; - let mut values = Vec::with_capacity(vc); - if false { - for i in 0..vc { - let h = i * STL; - let value = ST::$from_bytes(fr.data()[h..h + STL].try_into()?); - values.push(value); - } - } else { - let mut ptr: *const u8 = fr.data().as_ptr(); - let mut ptr2: *mut ST = values.as_mut_ptr(); - for _ in 0..vc { - unsafe { - let a: &[u8; STL] = &*(ptr as *const [u8; STL]); - *ptr2 = ST::$from_bytes(*a); - } - ptr = ptr.wrapping_offset(STL as isize); - ptr2 = ptr2.wrapping_offset(1); - } - unsafe { - values.set_len(vc); - } - } - if values.len() > self.array_truncate { - if self.truncated < 10 { - warn!( - "truncate {} to {} for series {}", - values.len(), - self.array_truncate, - self.series - ); - } - values.truncate(self.array_truncate); + let vc2 = (fr.data().len() / STL); + let vc = vc2.min(self.array_truncate); + if vc != vc2 { self.truncated = self.truncated.saturating_add(1); } + let mut values = Vec::with_capacity(vc); + for i in 0..vc { + let h = i * STL; + let value = ST::$from_bytes(fr.data()[h..h + STL].try_into()?); + values.push(value); + } self.values.push((self.series, ts_msp, ts_lsp, pulse, values)); Ok(()) } @@ -531,20 +461,21 @@ macro_rules! impl_msg_acceptor_array { self.len() >= 40 + ((self.series as usize) & 0x7) } - fn flush_batch(&mut self, scy: Arc) -> Result { + fn flush_batch<'a>(&'a mut self, scy: &'a ScySession) -> Result, Error> { let vt = mem::replace(&mut self.values, vec![]); let nn = vt.len(); - let mut batch = Batch::new(BatchType::Unlogged); + self.batch = Batch::new(BatchType::Unlogged); + let batch = &mut self.batch; for _ in 0..nn { batch.append_statement(self.query.clone()); } - let ret = ScyBatchFutGen::new(scy, batch, vt); + let ret = ScyBatchFutGen::new(&scy, batch, vt); Ok(ret) } - fn flush_loop(&mut self, scy: Arc) -> Result { + fn flush_loop<'a>(&'a mut self, scy: &'a ScySession) -> Result, Error> { let vt = mem::replace(&mut self.values, vec![]); - let ret = InsertLoopFut::new(scy, self.query.clone(), vt, self.opts.skip_insert); + let ret = InsertLoopFut::new(scy, &self.query, vt, self.opts.skip_insert); Ok(ret) } } @@ -582,6 +513,7 @@ struct MsgAcceptorArrayBool { array_truncate: usize, truncated: usize, opts: MsgAcceptorOptions, + batch: Batch, } impl MsgAcceptorArrayBool { @@ -593,6 +525,7 @@ impl MsgAcceptorArrayBool { array_truncate: opts.array_truncate, truncated: 0, opts, + batch: Batch::new(BatchType::Unlogged), } } } @@ -633,20 +566,21 @@ impl MsgAcceptor for MsgAcceptorArrayBool { self.len() >= 40 + ((self.series as usize) & 0x7) } - fn flush_batch(&mut self, scy: Arc) -> Result { + fn flush_batch<'a>(&'a mut self, scy: &'a ScySession) -> Result { let vt = mem::replace(&mut self.values, vec![]); let nn = vt.len(); - let mut batch = Batch::new(BatchType::Unlogged); + self.batch = Batch::new(BatchType::Unlogged); + let batch = &mut self.batch; for _ in 0..nn { batch.append_statement(self.query.clone()); } - let ret = ScyBatchFutGen::new(scy, batch, vt); + let ret = ScyBatchFutGen::new(&scy, batch, vt); Ok(ret) } - fn flush_loop(&mut self, scy: Arc) -> Result { + fn flush_loop<'a>(&'a mut self, scy: &'a ScySession) -> Result, Error> { let vt = mem::replace(&mut self.values, vec![]); - let ret = InsertLoopFut::new(scy, self.query.clone(), vt, self.opts.skip_insert); + let ret = InsertLoopFut::new(scy, &self.query, vt, self.opts.skip_insert); Ok(ret) } } @@ -854,11 +788,9 @@ impl ChannelWriterAll { debug!("ts_msp changed ts {ts} pulse {pulse} ts_msp {ts_msp} ts_lsp {ts_lsp}"); self.ts_msp_last = ts_msp; if !self.skip_insert { - // TODO make the passing of the query parameters type safe. - // TODO the "dtype" table field is not needed here. Drop from database. let fut = ScyQueryFut::new( - self.scy.clone(), - self.common_queries.qu_insert_ts_msp.clone(), + &self.scy, + &self.common_queries.qu_insert_ts_msp, (self.series as i64, ts_msp as i64), ); Some(Box::pin(fut) as _) @@ -871,7 +803,7 @@ impl ChannelWriterAll { self.acceptor.accept(ts_msp as i64, ts_lsp as i64, pulse as i64, fr)?; if self.acceptor.should_flush() { let nn = self.acceptor.len(); - let fut = self.acceptor.flush_loop(self.scy.clone())?; + let fut = self.acceptor.flush_loop(&self.scy)?; let fut2 = Some(Box::pin(fut) as _); let ret = ChannelWriteFut { ts1: None, @@ -886,7 +818,7 @@ impl ChannelWriterAll { ts1: None, mask: 0, nn: 0, - fut1: fut1, + fut1, fut2: None, }; Ok(ret) diff --git a/netfetch/src/linuxhelper.rs b/netfetch/src/linuxhelper.rs new file mode 100644 index 0000000..a5983c8 --- /dev/null +++ b/netfetch/src/linuxhelper.rs @@ -0,0 +1,121 @@ +use err::Error; +use log::*; +use std::ffi::CStr; +use std::mem::MaybeUninit; +use std::sync::atomic::Ordering; +use tokio::net::TcpStream; + +pub fn local_hostname() -> String { + let mut buf = vec![0u8; 128]; + let hostname = unsafe { + let ec = libc::gethostname(buf.as_mut_ptr() as _, buf.len() - 2); + if ec != 0 { + panic!(); + } + let hostname = CStr::from_ptr(&buf[0] as *const _ as _); + hostname.to_str().unwrap() + }; + hostname.into() +} + +#[test] +fn test_get_local_hostname() { + assert_ne!(local_hostname().len(), 0); +} + +pub fn set_signal_handler() -> Result<(), Error> { + // Safe because it creates a valid value: + let mask: libc::sigset_t = unsafe { MaybeUninit::zeroed().assume_init() }; + let handler: libc::sighandler_t = handler_sigaction as *const libc::c_void as _; + let act = libc::sigaction { + sa_sigaction: handler, + sa_mask: mask, + sa_flags: 0, + sa_restorer: None, + }; + let (ec, msg) = unsafe { + let ec = libc::sigaction(libc::SIGINT, &act, std::ptr::null_mut()); + let errno = *libc::__errno_location(); + (ec, CStr::from_ptr(libc::strerror(errno))) + }; + if ec != 0 { + // Not valid to print here, but we will panic anyways after that. + eprintln!("error: {:?}", msg); + panic!(); + } + Ok(()) +} + +fn unset_signal_handler() -> Result<(), Error> { + // Safe because it creates a valid value: + let mask: libc::sigset_t = unsafe { MaybeUninit::zeroed().assume_init() }; + let act = libc::sigaction { + sa_sigaction: libc::SIG_DFL, + sa_mask: mask, + sa_flags: 0, + sa_restorer: None, + }; + let (ec, msg) = unsafe { + let ec = libc::sigaction(libc::SIGINT, &act, std::ptr::null_mut()); + let errno = *libc::__errno_location(); + (ec, CStr::from_ptr(libc::strerror(errno))) + }; + if ec != 0 { + // Not valid to print here, but we will panic anyways after that. + eprintln!("error: {:?}", msg); + panic!(); + } + Ok(()) +} + +fn handler_sigaction(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) { + crate::ca::SIGINT.store(1, Ordering::Release); + let _ = unset_signal_handler(); +} + +pub fn set_rcv_sock_opts(conn: &mut TcpStream, rcvbuf: u32) -> Result<(), Error> { + use std::mem::size_of; + use std::os::unix::prelude::AsRawFd; + let fd = conn.as_raw_fd(); + unsafe { + type N = libc::c_int; + let n: N = rcvbuf as _; + let ec = libc::setsockopt( + fd, + libc::SOL_SOCKET, + libc::SO_RCVBUF, + &n as *const N as _, + size_of::() as _, + ); + if ec != 0 { + error!("ec {ec}"); + if ec != 0 { + return Err(Error::with_msg_no_trace(format!("can not set socket option"))); + } + } + } + unsafe { + type N = libc::c_int; + let mut n: N = -1; + let mut l = size_of::() as libc::socklen_t; + let ec = libc::getsockopt( + fd, + libc::SOL_SOCKET, + libc::SO_RCVBUF, + &mut n as *mut N as _, + &mut l as _, + ); + if ec != 0 { + let errno = *libc::__errno_location(); + let es = CStr::from_ptr(libc::strerror(errno)); + error!("can not query socket option ec {ec} errno {errno} es {es:?}"); + } else { + if (n as u32) < rcvbuf * 5 / 6 { + warn!("SO_RCVBUF {n} smaller than requested {rcvbuf}"); + } else { + info!("SO_RCVBUF {n}"); + } + } + } + Ok(()) +} diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 4f10134..1ff8b3c 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -226,8 +226,8 @@ pub async fn start_metrics_service( .route( "/metrics", get(|| async { - let stats = crate::ca::get_metrics(); - match stats { + let stats = crate::ca::METRICS.lock().unwrap(); + match stats.as_ref() { Some(s) => { trace!("Metrics"); s.prometheus() diff --git a/netfetch/src/netfetch.rs b/netfetch/src/netfetch.rs index c8c5ea5..ed0b429 100644 --- a/netfetch/src/netfetch.rs +++ b/netfetch/src/netfetch.rs @@ -2,6 +2,7 @@ pub mod bsread; pub mod ca; pub mod channelwriter; pub mod errconv; +pub mod linuxhelper; pub mod metrics; pub mod netbuf; pub mod series; diff --git a/netfetch/src/store.rs b/netfetch/src/store.rs index 57d625f..eb1e44c 100644 --- a/netfetch/src/store.rs +++ b/netfetch/src/store.rs @@ -13,36 +13,27 @@ use scylla::{QueryResult, Session as ScySession}; use stats::CaConnStats; use std::net::SocketAddrV4; use std::pin::Pin; -use std::sync::Arc; use std::task::{Context, Poll}; use std::time::{Instant, SystemTime}; -pub struct ScyInsertFut { - #[allow(unused)] - scy: Arc, - #[allow(unused)] - query: Arc, - fut: Pin> + Send>>, +pub struct ScyInsertFut<'a> { + fut: Pin> + Send + 'a>>, polled: usize, ts_create: Instant, ts_poll_first: Instant, } -impl ScyInsertFut { +impl<'a> ScyInsertFut<'a> { const NAME: &'static str = "ScyInsertFut"; - pub fn new(scy: Arc, query: Arc, values: V) -> Self + pub fn new(scy: &'a ScySession, query: &'a PreparedStatement, values: V) -> Self where V: ValueList + Send + 'static, { - let scy_ref: &ScySession = unsafe { &*(scy.as_ref() as &_ as *const _) }; - let query_ref = unsafe { &*(query.as_ref() as &_ as *const _) }; - let fut = scy_ref.execute(query_ref, values); + let fut = scy.execute(query, values); let fut = Box::pin(fut) as _; let tsnow = Instant::now(); Self { - scy, - query, fut, polled: 0, ts_create: tsnow, @@ -51,7 +42,7 @@ impl ScyInsertFut { } } -impl Future for ScyInsertFut { +impl<'a> Future for ScyInsertFut<'a> { type Output = Result<(), Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index e61425a..56d96c6 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -17,7 +17,6 @@ use scylla::{Session as ScySession, SessionBuilder}; use serde_json::Value as JsVal; use stats::CheckEvery; use std::collections::BTreeMap; -use std::ffi::CStr; use std::fmt; use std::mem; use std::pin::Pin; @@ -166,7 +165,7 @@ impl BsreadClient { pub async fn run(&mut self) -> Result<(), Error> { let mut conn = tokio::net::TcpStream::connect(&self.source_addr).await?; if let Some(v) = self.rcvbuf { - set_rcv_sock_opts(&mut conn, v as u32)?; + crate::linuxhelper::set_rcv_sock_opts(&mut conn, v as u32)?; } let mut zmtp = Zmtp::new(conn, SocketType::PULL); let mut i1 = 0u64; @@ -290,11 +289,15 @@ impl BsreadClient { series_ids[i1].1 = series; } let series = series_ids[i1].1; - if let Some(cw) = self.channel_writers.get_mut(&series) { - let res = cw.write_msg(ts, pulse, fr)?.await?; + if let Some(_cw) = self.channel_writers.get_mut(&series) { + let _ = ts; + let _ = fr; + // TODO hand off item to a writer item queue. + err::todo(); + /*let res = cw.write_msg(ts, pulse, fr)?.await?; rows_inserted += res.nrows; time_spent_inserting = time_spent_inserting + res.dt; - bytes_payload += fr.data().len() as u64; + bytes_payload += fr.data().len() as u64;*/ } else { // TODO check for missing writers. warn!("no writer for {}", chn.name); @@ -526,53 +529,6 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> { Ok(()) } -fn set_rcv_sock_opts(conn: &mut TcpStream, rcvbuf: u32) -> Result<(), Error> { - use std::mem::size_of; - use std::os::unix::prelude::AsRawFd; - let fd = conn.as_raw_fd(); - unsafe { - type N = libc::c_int; - let n: N = rcvbuf as _; - let ec = libc::setsockopt( - fd, - libc::SOL_SOCKET, - libc::SO_RCVBUF, - &n as *const N as _, - size_of::() as _, - ); - if ec != 0 { - error!("ec {ec}"); - if ec != 0 { - return Err(Error::with_msg_no_trace(format!("can not set socket option"))); - } - } - } - unsafe { - type N = libc::c_int; - let mut n: N = -1; - let mut l = size_of::() as libc::socklen_t; - let ec = libc::getsockopt( - fd, - libc::SOL_SOCKET, - libc::SO_RCVBUF, - &mut n as *mut N as _, - &mut l as _, - ); - if ec != 0 { - let errno = *libc::__errno_location(); - let es = CStr::from_ptr(libc::strerror(errno)); - error!("can not query socket option ec {ec} errno {errno} es {es:?}"); - } else { - if (n as u32) < rcvbuf * 5 / 6 { - warn!("SO_RCVBUF {n} smaller than requested {rcvbuf}"); - } else { - info!("SO_RCVBUF {n}"); - } - } - } - Ok(()) -} - pub struct BsreadDumper { source_addr: String, parser: Parser,