This commit is contained in:
Dominik Werder
2022-09-15 15:35:20 +02:00
parent b553402422
commit 0905ee6dfc
10 changed files with 229 additions and 294 deletions

View File

@@ -38,3 +38,5 @@ err = { path = "../../daqbuffer/err" }
netpod = { path = "../../daqbuffer/netpod" }
taskrun = { path = "../../daqbuffer/taskrun" }
bitshuffle = { path = "../../daqbuffer/bitshuffle" }
pin-project = "1"
lazy_static = "1"

View File

@@ -7,6 +7,7 @@ pub mod store;
use self::store::DataStore;
use crate::ca::conn::ConnCommand;
use crate::errconv::ErrConv;
use crate::linuxhelper::local_hostname;
use crate::store::{CommonInsertItemQueue, QueryItem};
use async_channel::Sender;
use conn::CaConn;
@@ -19,12 +20,10 @@ use netpod::{Database, ScyllaConfig};
use serde::{Deserialize, Serialize};
use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff};
use std::collections::{BTreeMap, VecDeque};
use std::ffi::CStr;
use std::mem::MaybeUninit;
use std::net::{IpAddr, Ipv4Addr, SocketAddrV4};
use std::path::PathBuf;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, Once};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::fs::OpenOptions;
use tokio::io::AsyncReadExt;
@@ -32,17 +31,10 @@ use tokio::sync::Mutex as TokMx;
use tokio::task::JoinHandle;
use tokio_postgres::Client as PgClient;
static mut METRICS: Option<Mutex<Option<CaConnStatsAgg>>> = None;
static METRICS_ONCE: Once = Once::new();
pub static SIGINT: AtomicU32 = AtomicU32::new(0);
pub fn get_metrics() -> &'static mut Option<CaConnStatsAgg> {
METRICS_ONCE.call_once(|| unsafe {
METRICS = Some(Mutex::new(None));
});
let mut g = unsafe { METRICS.as_mut().unwrap().lock().unwrap() };
let ret: &mut Option<CaConnStatsAgg> = &mut *g;
let ret = unsafe { &mut *(ret as *mut _) };
ret
lazy_static::lazy_static! {
pub static ref METRICS: Mutex<Option<CaConnStatsAgg>> = Mutex::new(None);
}
#[derive(Debug, Serialize, Deserialize)]
@@ -107,24 +99,6 @@ pub struct ListenFromFileOpts {
pub config: PathBuf,
}
fn local_hostname() -> String {
let mut buf = vec![0u8; 128];
let hostname = unsafe {
let ec = libc::gethostname(buf.as_mut_ptr() as _, buf.len() - 2);
if ec != 0 {
panic!();
}
let hostname = CStr::from_ptr(&buf[0] as *const _ as _);
hostname.to_str().unwrap()
};
hostname.into()
}
#[test]
fn test_get_local_hostname() {
assert_ne!(local_hostname().len(), 0);
}
pub async fn parse_config(config: PathBuf) -> Result<CaConnectOpts, Error> {
let mut file = OpenOptions::new().read(true).open(config).await?;
let mut buf = vec![];
@@ -457,56 +431,8 @@ pub async fn create_ca_conn(
Ok(jh)
}
pub static SIGINT: AtomicU32 = AtomicU32::new(0);
fn handler_sigaction(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) {
SIGINT.store(1, Ordering::Release);
let _ = unset_signal_handler();
}
fn set_signal_handler() -> Result<(), Error> {
let mask: libc::sigset_t = unsafe { MaybeUninit::zeroed().assume_init() };
let handler: libc::sighandler_t = handler_sigaction as *const libc::c_void as _;
let act = libc::sigaction {
sa_sigaction: handler,
sa_mask: mask,
sa_flags: 0,
sa_restorer: None,
};
unsafe {
let ec = libc::sigaction(libc::SIGINT, &act, std::ptr::null_mut());
if ec != 0 {
let errno = *libc::__errno_location();
let msg = CStr::from_ptr(libc::strerror(errno));
error!("error: {:?}", msg);
return Err(Error::with_msg_no_trace(format!("can not set signal handler")));
}
}
Ok(())
}
fn unset_signal_handler() -> Result<(), Error> {
let mask: libc::sigset_t = unsafe { MaybeUninit::zeroed().assume_init() };
let act = libc::sigaction {
sa_sigaction: libc::SIG_DFL,
sa_mask: mask,
sa_flags: 0,
sa_restorer: None,
};
unsafe {
let ec = libc::sigaction(libc::SIGINT, &act, std::ptr::null_mut());
if ec != 0 {
let errno = *libc::__errno_location();
let msg = CStr::from_ptr(libc::strerror(errno));
error!("error: {:?}", msg);
return Err(Error::with_msg_no_trace(format!("can not set signal handler")));
}
}
Ok(())
}
pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
set_signal_handler()?;
crate::linuxhelper::set_signal_handler()?;
let insert_frac = Arc::new(AtomicU64::new(1000));
let insert_ivl_min = Arc::new(AtomicU64::new(8800));
let opts = parse_config(opts.config).await?;
@@ -609,7 +535,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
for g in conn_stats.lock().await.iter() {
agg.push(&g);
}
let m = get_metrics();
let mut m = METRICS.lock().unwrap();
*m = Some(agg.clone());
if false {
let diff = CaConnStatsAggDiff::diff_from(&agg_last, &agg);

View File

@@ -39,6 +39,7 @@ pub enum ChannelConnectedInfo {
pub struct ChannelStateInfo {
pub name: String,
pub addr: SocketAddrV4,
pub series: Option<SeriesId>,
pub channel_connected_info: ChannelConnectedInfo,
pub scalar_type: Option<ScalarType>,
pub shape: Option<Shape>,
@@ -169,10 +170,15 @@ impl ChannelState {
}
_ => None,
};
let series = match self {
ChannelState::Created(s) => s.series.clone(),
_ => None,
};
let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10);
ChannelStateInfo {
name,
addr,
series,
channel_connected_info,
scalar_type,
shape,

View File

@@ -385,7 +385,7 @@ impl CaMsgTy {
error!("bad buffer given for search payload {} vs {}", buf.len(), d.len());
panic!();
}
unsafe { std::ptr::copy(&d[0] as _, &mut buf[0] as _, d.len()) };
buf[0..d.len()].copy_from_slice(&d[0..d.len()]);
}
SearchRes(_) => {
error!("should not attempt to write SearchRes");
@@ -400,7 +400,7 @@ impl CaMsgTy {
error!("bad buffer given for create chan payload {} vs {}", buf.len(), d.len());
panic!();
}
unsafe { std::ptr::copy(&d[0] as _, &mut buf[0] as _, d.len()) };
buf[0..d.len()].copy_from_slice(&d[0..d.len()]);
}
CreateChanRes(_) => {}
CreateChanFail(_) => {}
@@ -438,7 +438,7 @@ macro_rules! convert_wave_value {
const STL: usize = std::mem::size_of::<ST>();
let nn = $n.min($buf.len() / STL);
let mut a = Vec::with_capacity(nn);
// TODO optimize with unsafe?
// TODO should optimize?
let mut bb = &$buf[..];
for _ in 0..nn {
let v = ST::from_be_bytes(bb[..STL].try_into()?);

View File

@@ -17,37 +17,21 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
pub struct ScyQueryFut<V> {
#[allow(unused)]
scy: Arc<ScySession>,
#[allow(unused)]
query: Box<PreparedStatement>,
#[allow(unused)]
values: Box<V>,
fut: Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>> + Send>>,
pub struct ScyQueryFut<'a> {
fut: Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>> + Send + 'a>>,
}
impl<V> ScyQueryFut<V> {
pub fn new(scy: Arc<ScySession>, query: PreparedStatement, values: V) -> Self
impl<'a> ScyQueryFut<'a> {
pub fn new<V>(scy: &'a ScySession, query: &'a PreparedStatement, values: V) -> Self
where
V: ValueList + Sync + 'static,
V: ValueList + Send + 'static,
{
let query = Box::new(query);
let values = Box::new(values);
let scy2 = unsafe { &*(&scy as &_ as *const _) } as &ScySession;
let query2 = unsafe { &*(&query as &_ as *const _) } as &PreparedStatement;
let v2 = unsafe { &*(&values as &_ as *const _) } as &V;
let fut = scy2.execute(query2, v2);
Self {
scy,
query,
values,
fut: Box::pin(fut),
}
let fut = scy.execute(query, values);
Self { fut: Box::pin(fut) }
}
}
impl<V> Future for ScyQueryFut<V> {
impl<'a> Future for ScyQueryFut<'a> {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
@@ -55,45 +39,28 @@ impl<V> Future for ScyQueryFut<V> {
match self.fut.poll_unpin(cx) {
Ready(k) => match k {
Ok(_) => Ready(Ok(())),
Err(e) => {
warn!("ScyQueryFut done Err");
Ready(Err(e).err_conv())
}
Err(e) => Ready(Err(e).err_conv()),
},
Pending => Pending,
}
}
}
pub struct ScyBatchFut<V> {
#[allow(unused)]
scy: Arc<ScySession>,
#[allow(unused)]
batch: Box<Batch>,
#[allow(unused)]
values: Box<V>,
fut: Pin<Box<dyn Future<Output = Result<BatchResult, QueryError>>>>,
pub struct ScyBatchFut<'a> {
fut: Pin<Box<dyn Future<Output = Result<BatchResult, QueryError>> + 'a>>,
polled: usize,
ts_create: Instant,
ts_poll_start: Instant,
}
impl<V> ScyBatchFut<V> {
pub fn new(scy: Arc<ScySession>, batch: Batch, values: V) -> Self
impl<'a> ScyBatchFut<'a> {
pub fn new<V>(scy: &'a ScySession, batch: &'a Batch, values: V) -> Self
where
V: BatchValues + 'static,
V: BatchValues + Send + Sync + 'static,
{
let batch = Box::new(batch);
let values = Box::new(values);
let scy2 = unsafe { &*(&scy as &_ as *const _) } as &ScySession;
let batch2 = unsafe { &*(&batch as &_ as *const _) } as &Batch;
let v2 = unsafe { &*(&values as &_ as *const _) } as &V;
let fut = scy2.batch(batch2, v2);
let fut = scy.batch(batch, values);
let tsnow = Instant::now();
Self {
scy,
batch,
values,
fut: Box::pin(fut),
polled: 0,
ts_create: tsnow,
@@ -102,7 +69,7 @@ impl<V> ScyBatchFut<V> {
}
}
impl<V> Future for ScyBatchFut<V> {
impl<'a> Future for ScyBatchFut<'a> {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
@@ -134,30 +101,21 @@ impl<V> Future for ScyBatchFut<V> {
}
}
pub struct ScyBatchFutGen {
#[allow(unused)]
scy: Arc<ScySession>,
#[allow(unused)]
batch: Box<Batch>,
fut: Pin<Box<dyn Future<Output = Result<BatchResult, QueryError>> + Send>>,
pub struct ScyBatchFutGen<'a> {
fut: Pin<Box<dyn Future<Output = Result<BatchResult, QueryError>> + Send + 'a>>,
polled: usize,
ts_create: Instant,
ts_poll_start: Instant,
}
impl ScyBatchFutGen {
pub fn new<V>(scy: Arc<ScySession>, batch: Batch, values: V) -> Self
impl<'a> ScyBatchFutGen<'a> {
pub fn new<V>(scy: &'a ScySession, batch: &'a Batch, values: V) -> Self
where
V: BatchValues + Sync + Send + 'static,
V: BatchValues + Send + Sync + 'static,
{
let batch = Box::new(batch);
let scy_ref = unsafe { &*(&scy as &_ as *const _) } as &ScySession;
let batch_ref = unsafe { &*(&batch as &_ as *const _) } as &Batch;
let fut = scy_ref.batch(batch_ref, values);
let fut = scy.batch(batch, values);
let tsnow = Instant::now();
Self {
scy,
batch,
fut: Box::pin(fut),
polled: 0,
ts_create: tsnow,
@@ -166,7 +124,7 @@ impl ScyBatchFutGen {
}
}
impl Future for ScyBatchFutGen {
impl<'a> Future for ScyBatchFutGen<'a> {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
@@ -198,44 +156,35 @@ impl Future for ScyBatchFutGen {
}
}
pub struct InsertLoopFut {
#[allow(unused)]
scy: Arc<ScySession>,
#[allow(unused)]
query: Arc<PreparedStatement>,
futs: Vec<Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>> + Send>>>,
pub struct InsertLoopFut<'a> {
futs: Vec<Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>> + Send + 'a>>>,
fut_ix: usize,
polled: usize,
ts_create: Instant,
ts_poll_start: Instant,
}
impl InsertLoopFut {
pub fn new<V>(scy: Arc<ScySession>, query: PreparedStatement, values: Vec<V>, skip_insert: bool) -> Self
impl<'a> InsertLoopFut<'a> {
pub fn new<V>(scy: &'a ScySession, query: &'a PreparedStatement, values: Vec<V>, skip_insert: bool) -> Self
where
V: ValueList + Send + 'static,
V: ValueList + Send + Sync + 'static,
{
let mut values = values;
if skip_insert {
values.clear();
}
let query = Arc::new(query);
let scy_ref = unsafe { &*(&scy as &_ as *const _) } as &ScySession;
let query_ref = unsafe { &*(&query as &_ as *const _) } as &PreparedStatement;
// TODO
// Can I store the values in some better generic form?
// Or is it acceptable to generate all insert futures right here and poll them later?
let futs: Vec<_> = values
.into_iter()
.map(|vs| {
let fut = scy_ref.execute(query_ref, vs);
let fut = scy.execute(query, vs);
Box::pin(fut) as _
})
.collect();
let tsnow = Instant::now();
Self {
scy,
query,
futs,
fut_ix: 0,
polled: 0,
@@ -245,7 +194,7 @@ impl InsertLoopFut {
}
}
impl Future for InsertLoopFut {
impl<'a> Future for InsertLoopFut<'a> {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
@@ -301,15 +250,15 @@ pub struct ChannelWriteRes {
pub dt: Duration,
}
pub struct ChannelWriteFut {
pub struct ChannelWriteFut<'a> {
nn: usize,
fut1: Option<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
fut2: Option<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
fut1: Option<Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>>>,
fut2: Option<Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>>>,
ts1: Option<Instant>,
mask: u8,
}
impl Future for ChannelWriteFut {
impl<'a> Future for ChannelWriteFut<'a> {
type Output = Result<ChannelWriteRes, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
@@ -386,8 +335,8 @@ trait MsgAcceptor {
fn len(&self) -> usize;
fn accept(&mut self, ts_msp: i64, ts_lsp: i64, pulse: i64, fr: &ZmtpFrame) -> Result<(), Error>;
fn should_flush(&self) -> bool;
fn flush_loop(&mut self, scy: Arc<ScySession>) -> Result<InsertLoopFut, Error>;
fn flush_batch(&mut self, scy: Arc<ScySession>) -> Result<ScyBatchFutGen, Error>;
fn flush_loop<'a>(&'a mut self, scy: &'a ScySession) -> Result<InsertLoopFut<'a>, Error>;
fn flush_batch<'a>(&'a mut self, scy: &'a ScySession) -> Result<ScyBatchFutGen<'a>, Error>;
}
macro_rules! impl_msg_acceptor_scalar {
@@ -397,6 +346,7 @@ macro_rules! impl_msg_acceptor_scalar {
values: Vec<(i64, i64, i64, i64, $st)>,
series: i64,
opts: MsgAcceptorOptions,
batch: Batch,
}
impl $sname {
@@ -406,6 +356,7 @@ macro_rules! impl_msg_acceptor_scalar {
values: vec![],
series,
opts,
batch: Batch::new((BatchType::Unlogged)),
}
}
}
@@ -436,20 +387,21 @@ macro_rules! impl_msg_acceptor_scalar {
self.len() >= 140 + ((self.series as usize) & 0x1f)
}
fn flush_batch(&mut self, scy: Arc<ScySession>) -> Result<ScyBatchFutGen, Error> {
fn flush_batch<'a>(&'a mut self, scy: &'a ScySession) -> Result<ScyBatchFutGen<'a>, Error> {
let vt = mem::replace(&mut self.values, vec![]);
let nn = vt.len();
let mut batch = Batch::new(BatchType::Unlogged);
self.batch = Batch::new(BatchType::Unlogged);
let batch = &mut self.batch;
for _ in 0..nn {
batch.append_statement(self.query.clone());
}
let ret = ScyBatchFutGen::new(scy, batch, vt);
let ret = ScyBatchFutGen::new(&scy, batch, vt);
Ok(ret)
}
fn flush_loop(&mut self, scy: Arc<ScySession>) -> Result<InsertLoopFut, Error> {
fn flush_loop<'a>(&'a mut self, scy: &'a ScySession) -> Result<InsertLoopFut<'a>, Error> {
let vt = mem::replace(&mut self.values, vec![]);
let ret = InsertLoopFut::new(scy, self.query.clone(), vt, self.opts.skip_insert);
let ret = InsertLoopFut::new(scy, &self.query, vt, self.opts.skip_insert);
Ok(ret)
}
}
@@ -465,6 +417,7 @@ macro_rules! impl_msg_acceptor_array {
array_truncate: usize,
truncated: usize,
opts: MsgAcceptorOptions,
batch: Batch,
}
impl $sname {
@@ -476,6 +429,7 @@ macro_rules! impl_msg_acceptor_array {
array_truncate: opts.array_truncate,
truncated: 0,
opts,
batch: Batch::new(BatchType::Unlogged),
}
}
}
@@ -488,41 +442,17 @@ macro_rules! impl_msg_acceptor_array {
fn accept(&mut self, ts_msp: i64, ts_lsp: i64, pulse: i64, fr: &ZmtpFrame) -> Result<(), Error> {
type ST = $st;
const STL: usize = std::mem::size_of::<ST>();
let vc = fr.data().len() / STL;
let mut values = Vec::with_capacity(vc);
if false {
for i in 0..vc {
let h = i * STL;
let value = ST::$from_bytes(fr.data()[h..h + STL].try_into()?);
values.push(value);
}
} else {
let mut ptr: *const u8 = fr.data().as_ptr();
let mut ptr2: *mut ST = values.as_mut_ptr();
for _ in 0..vc {
unsafe {
let a: &[u8; STL] = &*(ptr as *const [u8; STL]);
*ptr2 = ST::$from_bytes(*a);
}
ptr = ptr.wrapping_offset(STL as isize);
ptr2 = ptr2.wrapping_offset(1);
}
unsafe {
values.set_len(vc);
}
}
if values.len() > self.array_truncate {
if self.truncated < 10 {
warn!(
"truncate {} to {} for series {}",
values.len(),
self.array_truncate,
self.series
);
}
values.truncate(self.array_truncate);
let vc2 = (fr.data().len() / STL);
let vc = vc2.min(self.array_truncate);
if vc != vc2 {
self.truncated = self.truncated.saturating_add(1);
}
let mut values = Vec::with_capacity(vc);
for i in 0..vc {
let h = i * STL;
let value = ST::$from_bytes(fr.data()[h..h + STL].try_into()?);
values.push(value);
}
self.values.push((self.series, ts_msp, ts_lsp, pulse, values));
Ok(())
}
@@ -531,20 +461,21 @@ macro_rules! impl_msg_acceptor_array {
self.len() >= 40 + ((self.series as usize) & 0x7)
}
fn flush_batch(&mut self, scy: Arc<ScySession>) -> Result<ScyBatchFutGen, Error> {
fn flush_batch<'a>(&'a mut self, scy: &'a ScySession) -> Result<ScyBatchFutGen<'a>, Error> {
let vt = mem::replace(&mut self.values, vec![]);
let nn = vt.len();
let mut batch = Batch::new(BatchType::Unlogged);
self.batch = Batch::new(BatchType::Unlogged);
let batch = &mut self.batch;
for _ in 0..nn {
batch.append_statement(self.query.clone());
}
let ret = ScyBatchFutGen::new(scy, batch, vt);
let ret = ScyBatchFutGen::new(&scy, batch, vt);
Ok(ret)
}
fn flush_loop(&mut self, scy: Arc<ScySession>) -> Result<InsertLoopFut, Error> {
fn flush_loop<'a>(&'a mut self, scy: &'a ScySession) -> Result<InsertLoopFut<'a>, Error> {
let vt = mem::replace(&mut self.values, vec![]);
let ret = InsertLoopFut::new(scy, self.query.clone(), vt, self.opts.skip_insert);
let ret = InsertLoopFut::new(scy, &self.query, vt, self.opts.skip_insert);
Ok(ret)
}
}
@@ -582,6 +513,7 @@ struct MsgAcceptorArrayBool {
array_truncate: usize,
truncated: usize,
opts: MsgAcceptorOptions,
batch: Batch,
}
impl MsgAcceptorArrayBool {
@@ -593,6 +525,7 @@ impl MsgAcceptorArrayBool {
array_truncate: opts.array_truncate,
truncated: 0,
opts,
batch: Batch::new(BatchType::Unlogged),
}
}
}
@@ -633,20 +566,21 @@ impl MsgAcceptor for MsgAcceptorArrayBool {
self.len() >= 40 + ((self.series as usize) & 0x7)
}
fn flush_batch(&mut self, scy: Arc<ScySession>) -> Result<ScyBatchFutGen, Error> {
fn flush_batch<'a>(&'a mut self, scy: &'a ScySession) -> Result<ScyBatchFutGen, Error> {
let vt = mem::replace(&mut self.values, vec![]);
let nn = vt.len();
let mut batch = Batch::new(BatchType::Unlogged);
self.batch = Batch::new(BatchType::Unlogged);
let batch = &mut self.batch;
for _ in 0..nn {
batch.append_statement(self.query.clone());
}
let ret = ScyBatchFutGen::new(scy, batch, vt);
let ret = ScyBatchFutGen::new(&scy, batch, vt);
Ok(ret)
}
fn flush_loop(&mut self, scy: Arc<ScySession>) -> Result<InsertLoopFut, Error> {
fn flush_loop<'a>(&'a mut self, scy: &'a ScySession) -> Result<InsertLoopFut<'a>, Error> {
let vt = mem::replace(&mut self.values, vec![]);
let ret = InsertLoopFut::new(scy, self.query.clone(), vt, self.opts.skip_insert);
let ret = InsertLoopFut::new(scy, &self.query, vt, self.opts.skip_insert);
Ok(ret)
}
}
@@ -854,11 +788,9 @@ impl ChannelWriterAll {
debug!("ts_msp changed ts {ts} pulse {pulse} ts_msp {ts_msp} ts_lsp {ts_lsp}");
self.ts_msp_last = ts_msp;
if !self.skip_insert {
// TODO make the passing of the query parameters type safe.
// TODO the "dtype" table field is not needed here. Drop from database.
let fut = ScyQueryFut::new(
self.scy.clone(),
self.common_queries.qu_insert_ts_msp.clone(),
&self.scy,
&self.common_queries.qu_insert_ts_msp,
(self.series as i64, ts_msp as i64),
);
Some(Box::pin(fut) as _)
@@ -871,7 +803,7 @@ impl ChannelWriterAll {
self.acceptor.accept(ts_msp as i64, ts_lsp as i64, pulse as i64, fr)?;
if self.acceptor.should_flush() {
let nn = self.acceptor.len();
let fut = self.acceptor.flush_loop(self.scy.clone())?;
let fut = self.acceptor.flush_loop(&self.scy)?;
let fut2 = Some(Box::pin(fut) as _);
let ret = ChannelWriteFut {
ts1: None,
@@ -886,7 +818,7 @@ impl ChannelWriterAll {
ts1: None,
mask: 0,
nn: 0,
fut1: fut1,
fut1,
fut2: None,
};
Ok(ret)

121
netfetch/src/linuxhelper.rs Normal file
View File

@@ -0,0 +1,121 @@
use err::Error;
use log::*;
use std::ffi::CStr;
use std::mem::MaybeUninit;
use std::sync::atomic::Ordering;
use tokio::net::TcpStream;
pub fn local_hostname() -> String {
let mut buf = vec![0u8; 128];
let hostname = unsafe {
let ec = libc::gethostname(buf.as_mut_ptr() as _, buf.len() - 2);
if ec != 0 {
panic!();
}
let hostname = CStr::from_ptr(&buf[0] as *const _ as _);
hostname.to_str().unwrap()
};
hostname.into()
}
#[test]
fn test_get_local_hostname() {
assert_ne!(local_hostname().len(), 0);
}
pub fn set_signal_handler() -> Result<(), Error> {
// Safe because it creates a valid value:
let mask: libc::sigset_t = unsafe { MaybeUninit::zeroed().assume_init() };
let handler: libc::sighandler_t = handler_sigaction as *const libc::c_void as _;
let act = libc::sigaction {
sa_sigaction: handler,
sa_mask: mask,
sa_flags: 0,
sa_restorer: None,
};
let (ec, msg) = unsafe {
let ec = libc::sigaction(libc::SIGINT, &act, std::ptr::null_mut());
let errno = *libc::__errno_location();
(ec, CStr::from_ptr(libc::strerror(errno)))
};
if ec != 0 {
// Not valid to print here, but we will panic anyways after that.
eprintln!("error: {:?}", msg);
panic!();
}
Ok(())
}
fn unset_signal_handler() -> Result<(), Error> {
// Safe because it creates a valid value:
let mask: libc::sigset_t = unsafe { MaybeUninit::zeroed().assume_init() };
let act = libc::sigaction {
sa_sigaction: libc::SIG_DFL,
sa_mask: mask,
sa_flags: 0,
sa_restorer: None,
};
let (ec, msg) = unsafe {
let ec = libc::sigaction(libc::SIGINT, &act, std::ptr::null_mut());
let errno = *libc::__errno_location();
(ec, CStr::from_ptr(libc::strerror(errno)))
};
if ec != 0 {
// Not valid to print here, but we will panic anyways after that.
eprintln!("error: {:?}", msg);
panic!();
}
Ok(())
}
fn handler_sigaction(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) {
crate::ca::SIGINT.store(1, Ordering::Release);
let _ = unset_signal_handler();
}
pub fn set_rcv_sock_opts(conn: &mut TcpStream, rcvbuf: u32) -> Result<(), Error> {
use std::mem::size_of;
use std::os::unix::prelude::AsRawFd;
let fd = conn.as_raw_fd();
unsafe {
type N = libc::c_int;
let n: N = rcvbuf as _;
let ec = libc::setsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_RCVBUF,
&n as *const N as _,
size_of::<N>() as _,
);
if ec != 0 {
error!("ec {ec}");
if ec != 0 {
return Err(Error::with_msg_no_trace(format!("can not set socket option")));
}
}
}
unsafe {
type N = libc::c_int;
let mut n: N = -1;
let mut l = size_of::<N>() as libc::socklen_t;
let ec = libc::getsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_RCVBUF,
&mut n as *mut N as _,
&mut l as _,
);
if ec != 0 {
let errno = *libc::__errno_location();
let es = CStr::from_ptr(libc::strerror(errno));
error!("can not query socket option ec {ec} errno {errno} es {es:?}");
} else {
if (n as u32) < rcvbuf * 5 / 6 {
warn!("SO_RCVBUF {n} smaller than requested {rcvbuf}");
} else {
info!("SO_RCVBUF {n}");
}
}
}
Ok(())
}

View File

@@ -226,8 +226,8 @@ pub async fn start_metrics_service(
.route(
"/metrics",
get(|| async {
let stats = crate::ca::get_metrics();
match stats {
let stats = crate::ca::METRICS.lock().unwrap();
match stats.as_ref() {
Some(s) => {
trace!("Metrics");
s.prometheus()

View File

@@ -2,6 +2,7 @@ pub mod bsread;
pub mod ca;
pub mod channelwriter;
pub mod errconv;
pub mod linuxhelper;
pub mod metrics;
pub mod netbuf;
pub mod series;

View File

@@ -13,36 +13,27 @@ use scylla::{QueryResult, Session as ScySession};
use stats::CaConnStats;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Instant, SystemTime};
pub struct ScyInsertFut {
#[allow(unused)]
scy: Arc<ScySession>,
#[allow(unused)]
query: Arc<PreparedStatement>,
fut: Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>> + Send>>,
pub struct ScyInsertFut<'a> {
fut: Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>> + Send + 'a>>,
polled: usize,
ts_create: Instant,
ts_poll_first: Instant,
}
impl ScyInsertFut {
impl<'a> ScyInsertFut<'a> {
const NAME: &'static str = "ScyInsertFut";
pub fn new<V>(scy: Arc<ScySession>, query: Arc<PreparedStatement>, values: V) -> Self
pub fn new<V>(scy: &'a ScySession, query: &'a PreparedStatement, values: V) -> Self
where
V: ValueList + Send + 'static,
{
let scy_ref: &ScySession = unsafe { &*(scy.as_ref() as &_ as *const _) };
let query_ref = unsafe { &*(query.as_ref() as &_ as *const _) };
let fut = scy_ref.execute(query_ref, values);
let fut = scy.execute(query, values);
let fut = Box::pin(fut) as _;
let tsnow = Instant::now();
Self {
scy,
query,
fut,
polled: 0,
ts_create: tsnow,
@@ -51,7 +42,7 @@ impl ScyInsertFut {
}
}
impl Future for ScyInsertFut {
impl<'a> Future for ScyInsertFut<'a> {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {

View File

@@ -17,7 +17,6 @@ use scylla::{Session as ScySession, SessionBuilder};
use serde_json::Value as JsVal;
use stats::CheckEvery;
use std::collections::BTreeMap;
use std::ffi::CStr;
use std::fmt;
use std::mem;
use std::pin::Pin;
@@ -166,7 +165,7 @@ impl BsreadClient {
pub async fn run(&mut self) -> Result<(), Error> {
let mut conn = tokio::net::TcpStream::connect(&self.source_addr).await?;
if let Some(v) = self.rcvbuf {
set_rcv_sock_opts(&mut conn, v as u32)?;
crate::linuxhelper::set_rcv_sock_opts(&mut conn, v as u32)?;
}
let mut zmtp = Zmtp::new(conn, SocketType::PULL);
let mut i1 = 0u64;
@@ -290,11 +289,15 @@ impl BsreadClient {
series_ids[i1].1 = series;
}
let series = series_ids[i1].1;
if let Some(cw) = self.channel_writers.get_mut(&series) {
let res = cw.write_msg(ts, pulse, fr)?.await?;
if let Some(_cw) = self.channel_writers.get_mut(&series) {
let _ = ts;
let _ = fr;
// TODO hand off item to a writer item queue.
err::todo();
/*let res = cw.write_msg(ts, pulse, fr)?.await?;
rows_inserted += res.nrows;
time_spent_inserting = time_spent_inserting + res.dt;
bytes_payload += fr.data().len() as u64;
bytes_payload += fr.data().len() as u64;*/
} else {
// TODO check for missing writers.
warn!("no writer for {}", chn.name);
@@ -526,53 +529,6 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> {
Ok(())
}
fn set_rcv_sock_opts(conn: &mut TcpStream, rcvbuf: u32) -> Result<(), Error> {
use std::mem::size_of;
use std::os::unix::prelude::AsRawFd;
let fd = conn.as_raw_fd();
unsafe {
type N = libc::c_int;
let n: N = rcvbuf as _;
let ec = libc::setsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_RCVBUF,
&n as *const N as _,
size_of::<N>() as _,
);
if ec != 0 {
error!("ec {ec}");
if ec != 0 {
return Err(Error::with_msg_no_trace(format!("can not set socket option")));
}
}
}
unsafe {
type N = libc::c_int;
let mut n: N = -1;
let mut l = size_of::<N>() as libc::socklen_t;
let ec = libc::getsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_RCVBUF,
&mut n as *mut N as _,
&mut l as _,
);
if ec != 0 {
let errno = *libc::__errno_location();
let es = CStr::from_ptr(libc::strerror(errno));
error!("can not query socket option ec {ec} errno {errno} es {es:?}");
} else {
if (n as u32) < rcvbuf * 5 / 6 {
warn!("SO_RCVBUF {n} smaller than requested {rcvbuf}");
} else {
info!("SO_RCVBUF {n}");
}
}
}
Ok(())
}
pub struct BsreadDumper {
source_addr: String,
parser: Parser,