Channel inserts through common scylla workers

This commit is contained in:
Dominik Werder
2022-05-25 10:46:09 +02:00
parent 277597400e
commit f5f1b23466
9 changed files with 365 additions and 344 deletions

View File

@@ -1,15 +1,15 @@
use super::proto::{self, CaItem, CaMsg, CaMsgTy, CaProto, EventAddRes};
use super::proto::{self, CaItem, CaMsg, CaMsgTy, CaProto};
use super::store::DataStore;
use crate::bsread::ChannelDescDecoded;
use crate::ca::proto::{CreateChan, EventAdd, HeadInfo, ReadNotify};
use crate::ca::proto::{CreateChan, EventAdd, HeadInfo};
use crate::series::{Existence, SeriesId};
use crate::store::ScyInsertFut;
use crate::store::{CommonInsertItemQueueSender, InsertItem};
use err::Error;
use futures_util::stream::{FuturesOrdered, FuturesUnordered};
use futures_util::stream::FuturesOrdered;
use futures_util::{Future, FutureExt, Stream, StreamExt, TryFutureExt};
use libc::c_int;
use log::*;
use netpod::timeunits::SEC;
use netpod::timeunits::*;
use netpod::{ScalarType, Shape};
use stats::{CaConnStats, IntervalEma};
use std::collections::{BTreeMap, VecDeque};
@@ -22,12 +22,9 @@ use std::time::{Duration, Instant, SystemTime};
use tokio::io::unix::AsyncFd;
use tokio::net::TcpStream;
const INSERT_FUTS_MAX: usize = 2;
const INSERT_FUTS_LIM: usize = 16;
const TABLE_SERIES_MOD: u32 = 128;
#[derive(Debug)]
enum ChannelError {
#[allow(unused)]
NoSuccess,
}
@@ -42,11 +39,15 @@ enum MonitoringState {
AddingEvent(SeriesId),
Evented(SeriesId, EventedState),
// TODO we also want to read while staying subscribed:
#[allow(unused)]
Reading,
#[allow(unused)]
Read,
#[allow(unused)]
Muted,
}
#[allow(unused)]
#[derive(Debug)]
struct CreatedState {
cid: u32,
@@ -60,6 +61,7 @@ struct CreatedState {
ivl_ema: IntervalEma,
}
#[allow(unused)]
#[derive(Debug)]
enum ChannelState {
Init,
@@ -91,199 +93,7 @@ impl IdStore {
}
}
// TODO test that errors are properly forwarded.
macro_rules! insert_scalar_impl {
($fname:ident, $valty:ty, $qu_insert:ident) => {
fn $fname(
data_store: Arc<DataStore>,
// TODO maybe use a newtype?
futs_queue: &mut FuturesOrdered<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
series: SeriesId,
ts_msp: u64,
ts_lsp: u64,
val: $valty,
ts_msp_changed: bool,
st: ScalarType,
sh: Shape,
stats: Arc<CaConnStats>,
) {
if futs_queue.len() >= INSERT_FUTS_LIM {
stats.inserts_discard.fetch_add(1, Ordering::AcqRel);
return;
}
let pulse = 0 as u64;
let params = (
series.id() as i64,
ts_msp as i64,
ts_lsp as i64,
pulse as i64,
val,
);
let fut3 = ScyInsertFut::new(data_store.scy.clone(), data_store.$qu_insert.clone(), params);
stats.inserts_val.fetch_add(1, Ordering::AcqRel);
let fut = if ts_msp_changed {
let fut1 = ScyInsertFut::new(
data_store.scy.clone(),
data_store.qu_insert_series.clone(),
(
(series.id() as u32 % TABLE_SERIES_MOD) as i32,
series.id() as i64,
ts_msp as i64,
st.to_scylla_i32(),
sh.to_scylla_vec(),
),
);
let fut2 = ScyInsertFut::new(
data_store.scy.clone(),
data_store.qu_insert_ts_msp.clone(),
(series.id() as i64, ts_msp as i64),
);
stats.inserts_msp.fetch_add(1, Ordering::AcqRel);
Box::pin(fut1.and_then(move |_| fut2).and_then(move |_| fut3)) as _
} else {
Box::pin(fut3) as _
};
futs_queue.push(fut);
stats.inserts_queue_push_inc();
}
};
}
// TODO test that errors are properly forwarded.
macro_rules! insert_array_impl {
($fname:ident, $valty:ty, $qu_insert:ident) => {
fn $fname(
data_store: Arc<DataStore>,
// TODO maybe use a newtype?
futs_queue: &mut FuturesOrdered<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
series: SeriesId,
ts_msp: u64,
ts_lsp: u64,
val: Vec<$valty>,
ts_msp_changed: bool,
st: ScalarType,
sh: Shape,
stats: Arc<CaConnStats>,
) {
if futs_queue.len() >= INSERT_FUTS_LIM {
stats.inserts_discard.fetch_add(1, Ordering::AcqRel);
return;
}
let pulse = 0 as u64;
let params = (
series.id() as i64,
ts_msp as i64,
ts_lsp as i64,
pulse as i64,
val,
);
let fut3 = ScyInsertFut::new(data_store.scy.clone(), data_store.$qu_insert.clone(), params);
stats.inserts_val.fetch_add(1, Ordering::AcqRel);
let fut = if ts_msp_changed {
let fut1 = ScyInsertFut::new(
data_store.scy.clone(),
data_store.qu_insert_series.clone(),
(
(series.id() as u32 % TABLE_SERIES_MOD) as i32,
series.id() as i64,
ts_msp as i64,
st.to_scylla_i32(),
sh.to_scylla_vec(),
),
);
let fut2 = ScyInsertFut::new(
data_store.scy.clone(),
data_store.qu_insert_ts_msp.clone(),
(series.id() as i64, ts_msp as i64),
);
stats.inserts_msp.fetch_add(1, Ordering::AcqRel);
Box::pin(fut1.and_then(move |_| fut2).and_then(move |_| fut3)) as _
} else {
Box::pin(fut3) as _
};
futs_queue.push(fut);
stats.inserts_queue_push_inc();
}
};
}
insert_scalar_impl!(insert_scalar_i8, i8, qu_insert_scalar_i8);
insert_scalar_impl!(insert_scalar_i16, i16, qu_insert_scalar_i16);
insert_scalar_impl!(insert_scalar_i32, i32, qu_insert_scalar_i32);
insert_scalar_impl!(insert_scalar_f32, f32, qu_insert_scalar_f32);
insert_scalar_impl!(insert_scalar_f64, f64, qu_insert_scalar_f64);
insert_scalar_impl!(insert_scalar_string, String, qu_insert_scalar_string);
insert_array_impl!(insert_array_i8, i8, qu_insert_array_i8);
insert_array_impl!(insert_array_i16, i16, qu_insert_array_i16);
insert_array_impl!(insert_array_i32, i32, qu_insert_array_i32);
insert_array_impl!(insert_array_f32, f32, qu_insert_array_f32);
insert_array_impl!(insert_array_f64, f64, qu_insert_array_f64);
macro_rules! match_scalar_value_insert {
($stv:ident, $insf:ident, $val:expr, $comm:expr) => {{
let (data_store, futs_queue, series, ts_msp, ts_lsp, ts_msp_changed, scalar_type, shape, stats2) = $comm;
match shape {
Shape::Scalar => match scalar_type {
ScalarType::$stv => $insf(
data_store,
futs_queue,
series,
ts_msp,
ts_lsp,
$val,
ts_msp_changed,
scalar_type,
shape,
stats2,
),
_ => {
error!("unexpected value type insf {:?}", stringify!($insf));
}
},
_ => {
error!(
"unexpected value shape insf {:?} shape {:?}",
stringify!($insf),
shape
);
}
}
}};
}
macro_rules! match_array_value_insert {
($stv:ident, $insf:ident, $val:expr, $comm:expr) => {{
let (data_store, futs_queue, series, ts_msp, ts_lsp, ts_msp_changed, scalar_type, shape, stats2) = $comm;
match shape {
Shape::Wave(_) => match scalar_type {
ScalarType::$stv => $insf(
data_store,
futs_queue,
series,
ts_msp,
ts_lsp,
$val,
ts_msp_changed,
scalar_type,
shape,
stats2,
),
_ => {
error!("unexpected value type insf {:?}", stringify!($insf));
}
},
_ => {
error!(
"unexpected value shape insf {:?} shape {:?}",
stringify!($insf),
shape
);
}
}
}};
}
#[allow(unused)]
pub struct CaConn {
state: CaConnState,
proto: CaProto,
@@ -298,11 +108,14 @@ pub struct CaConn {
name_by_cid: BTreeMap<u32, String>,
poll_count: usize,
data_store: Arc<DataStore>,
insert_item_queue: VecDeque<InsertItem>,
insert_item_sender: CommonInsertItemQueueSender,
insert_item_send_fut: Option<async_channel::Send<'static, InsertItem>>,
fut_get_series:
FuturesOrdered<Pin<Box<dyn Future<Output = Result<(u32, u32, u16, u16, Existence<SeriesId>), Error>> + Send>>>,
value_insert_futs: FuturesOrdered<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
remote_addr_dbg: SocketAddrV4,
stats: Arc<CaConnStats>,
insert_queue_max: usize,
}
impl CaConn {
@@ -310,7 +123,9 @@ impl CaConn {
tcp: TcpStream,
remote_addr_dbg: SocketAddrV4,
data_store: Arc<DataStore>,
insert_item_sender: CommonInsertItemQueueSender,
array_truncate: usize,
insert_queue_max: usize,
) -> Self {
Self {
state: CaConnState::Init,
@@ -326,10 +141,13 @@ impl CaConn {
name_by_cid: BTreeMap::new(),
poll_count: 0,
data_store,
insert_item_queue: VecDeque::new(),
insert_item_sender,
insert_item_send_fut: None,
fut_get_series: FuturesOrdered::new(),
value_insert_futs: FuturesOrdered::new(),
remote_addr_dbg,
stats: Arc::new(CaConnStats::new()),
insert_queue_max,
}
}
@@ -363,20 +181,29 @@ impl CaConn {
}
#[inline(never)]
fn handle_insert_futs(&mut self, cx: &mut Context) -> Result<(), Error> {
fn handle_insert_futs(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
use Poll::*;
while self.value_insert_futs.len() > 0 {
match self.value_insert_futs.poll_next_unpin(cx) {
Pending => break,
_ => {
self.stats.inserts_queue_pop_inc();
}
loop {
match self.insert_item_send_fut.as_mut() {
Some(fut) => match fut.poll_unpin(cx) {
Ready(Ok(())) => {
self.stats.inserts_queue_push_inc();
self.insert_item_send_fut = None;
}
Ready(Err(_)) => break Ready(Err(Error::with_msg_no_trace(format!("can not send the item")))),
Pending => break Pending,
},
None => {}
}
if let Some(item) = self.insert_item_queue.pop_front() {
let sender = unsafe { &*(&self.insert_item_sender as *const CommonInsertItemQueueSender) };
self.insert_item_send_fut = Some(sender.send(item));
} else {
break Ready(Ok(()));
}
}
Ok(())
}
#[inline(never)]
fn handle_get_series_futs(&mut self, cx: &mut Context) -> Result<(), Error> {
use Poll::*;
while self.fut_get_series.len() > 0 {
@@ -487,48 +314,19 @@ impl CaConn {
self.ts_msp_last_by_series.insert(series.clone(), ts_msp);
true
};
// TODO make sure that I only accept types I expect.
use crate::ca::proto::CaDataScalarValue::*;
use crate::ca::proto::CaDataValue::*;
let data_store = self.data_store.clone();
let futs_queue = &mut self.value_insert_futs;
let comm = (
data_store,
futs_queue,
series,
let item_queue = &mut self.insert_item_queue;
let item = InsertItem {
series: series.id(),
ts_msp,
ts_lsp,
ts_msp_changed,
msp_bump: ts_msp_changed,
pulse: 0,
scalar_type,
shape,
self.stats.clone(),
);
match ev.value {
Scalar(v) => match v {
I8(val) => match_scalar_value_insert!(I8, insert_scalar_i8, val, comm),
I16(val) => match_scalar_value_insert!(I16, insert_scalar_i16, val, comm),
I32(val) => match_scalar_value_insert!(I32, insert_scalar_i32, val, comm),
F32(val) => match_scalar_value_insert!(F32, insert_scalar_f32, val, comm),
F64(val) => match_scalar_value_insert!(F64, insert_scalar_f64, val, comm),
String(val) => match_scalar_value_insert!(STRING, insert_scalar_string, val, comm),
_ => {
warn!("can not handle Scalar {:?}", v);
}
},
Array(v) => {
use crate::ca::proto::CaDataArrayValue::*;
match v {
I8(val) => match_array_value_insert!(I8, insert_array_i8, val, comm),
I16(val) => match_array_value_insert!(I16, insert_array_i16, val, comm),
I32(val) => match_array_value_insert!(I32, insert_array_i32, val, comm),
F32(val) => match_array_value_insert!(F32, insert_array_f32, val, comm),
F64(val) => match_array_value_insert!(F64, insert_array_f64, val, comm),
_ => {
warn!("can not handle Array ty {} n {}", ev.data_type, ev.data_count);
}
}
}
}
val: ev.value,
};
item_queue.push_back(item);
self.stats.insert_item_create_inc();
Ok(())
}
@@ -585,46 +383,51 @@ impl CaConn {
Ok(())
}
/*
Acts more like a stream? Can be:
Pending
Ready(no-more-work, something-was-done, error)
*/
#[inline(never)]
fn handle_conn_listen(&mut self, cx: &mut Context) -> Option<Poll<Option<Result<(), Error>>>> {
fn handle_conn_listen(&mut self, cx: &mut Context) -> Poll<Option<Result<(), Error>>> {
use Poll::*;
match self.proto.poll_next_unpin(cx) {
Ready(Some(k)) => match k {
Ok(k) => match k {
CaItem::Empty => {
info!("CaItem::Empty");
Some(Ready(Some(Ok(()))))
Ready(Some(Ok(())))
}
CaItem::Msg(msg) => match msg.ty {
CaMsgTy::VersionRes(n) => {
if n < 12 || n > 13 {
error!("See some unexpected version {n} channel search may not work.");
Some(Ready(Some(Ok(()))))
Ready(Some(Ok(())))
} else {
if n != 13 {
warn!("Received peer version {n}");
}
self.state = CaConnState::PeerReady;
None
Ready(Some(Ok(())))
}
}
k => {
warn!("Got some other unhandled message: {k:?}");
Some(Ready(Some(Ok(()))))
Ready(Some(Ok(())))
}
},
},
Err(e) => {
error!("got error item from CaProto {e:?}");
Some(Ready(Some(Ok(()))))
Ready(Some(Ok(())))
}
},
Ready(None) => {
warn!("CaProto is done {:?}", self.remote_addr_dbg);
self.state = CaConnState::Done;
None
Ready(None)
}
Pending => Some(Pending),
Pending => Pending,
}
}
@@ -667,6 +470,8 @@ impl CaConn {
Ok(())
}
// Can return:
// Pending, error, work-done (pending state unknown), no-more-work-ever-again.
#[inline(never)]
fn handle_peer_ready(&mut self, cx: &mut Context) -> Poll<Option<Result<(), Error>>> {
use Poll::*;
@@ -677,7 +482,7 @@ impl CaConn {
let ts2 = Instant::now();
self.stats
.time_check_channels_state_init
.fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::Release);
.fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::Release);
ts1 = ts2;
let mut do_wake_again = false;
if msgs_tmp.len() > 0 {
@@ -745,8 +550,9 @@ impl CaConn {
let ts2 = Instant::now();
self.stats
.time_handle_event_add_res
.fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::Release);
.fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::AcqRel);
ts1 = ts2;
let _ = ts1;
res?
}
_ => {}
@@ -763,7 +569,7 @@ impl CaConn {
Ready(None) => {
warn!("CaProto is done");
self.state = CaConnState::Done;
Ready(Some(Ok(())))
Ready(None)
}
Pending => Pending,
};
@@ -784,65 +590,89 @@ impl Stream for CaConn {
let ts_outer_1 = Instant::now();
let mut ts1 = ts_outer_1;
self.poll_count += 1;
let ret = loop {
self.handle_insert_futs(cx)?;
// TODO factor out the inner loop:
let ret = 'outer: loop {
let q = self.handle_insert_futs(cx);
let ts2 = Instant::now();
self.stats
.poll_time_handle_insert_futs
.fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel);
.fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::AcqRel);
ts1 = ts2;
match q {
Ready(_) => {}
Pending => break Pending,
}
self.handle_get_series_futs(cx)?;
let ts2 = Instant::now();
self.stats
.poll_time_get_series_futs
.fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel);
.fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::AcqRel);
ts1 = ts2;
if self.value_insert_futs.len() >= INSERT_FUTS_MAX {
// TODO do not do more.
// But: can I assume that in this case we triggered a Pending?
if self.insert_item_queue.len() >= self.insert_queue_max {
break Pending;
}
break match &self.state {
CaConnState::Init => {
let msg = CaMsg { ty: CaMsgTy::Version };
self.proto.push_out(msg);
let msg = CaMsg {
ty: CaMsgTy::ClientName,
};
self.proto.push_out(msg);
let msg = CaMsg { ty: CaMsgTy::HostName };
self.proto.push_out(msg);
self.state = CaConnState::Listen;
continue;
}
CaConnState::Listen => match {
let res = self.handle_conn_listen(cx);
let ts2 = Instant::now();
self.stats
.time_handle_conn_listen
.fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel);
ts1 = ts2;
res
} {
Some(k) => k,
None => continue,
},
CaConnState::PeerReady => {
let res = self.handle_peer_ready(cx);
let ts2 = Instant::now();
self.stats.time_handle_peer_ready_dur(ts2.duration_since(ts1));
ts1 = ts2;
res
}
CaConnState::Done => Ready(None),
break loop {
break match &self.state {
CaConnState::Init => {
let msg = CaMsg { ty: CaMsgTy::Version };
self.proto.push_out(msg);
let msg = CaMsg {
ty: CaMsgTy::ClientName,
};
self.proto.push_out(msg);
let msg = CaMsg { ty: CaMsgTy::HostName };
self.proto.push_out(msg);
self.state = CaConnState::Listen;
continue 'outer;
}
CaConnState::Listen => match {
let res = self.handle_conn_listen(cx);
let ts2 = Instant::now();
self.stats
.time_handle_conn_listen
.fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::AcqRel);
ts1 = ts2;
res
} {
Ready(Some(Ok(()))) => Ready(Some(Ok(()))),
Ready(Some(Err(e))) => Ready(Some(Err(e))),
Ready(None) => continue 'outer,
Pending => Pending,
},
CaConnState::PeerReady => {
let res = self.handle_peer_ready(cx);
let ts2 = Instant::now();
self.stats.time_handle_peer_ready_dur(ts2.duration_since(ts1));
ts1 = ts2;
match res {
Ready(Some(Ok(()))) => {
if self.insert_item_queue.len() >= self.insert_queue_max {
continue 'outer;
} else {
continue;
}
}
Ready(Some(Err(e))) => Ready(Some(Err(e))),
Ready(None) => {
// TODO even though protocol is done, we might still have e.g. insert items to flush!
Ready(None)
}
Pending => Pending,
}
}
CaConnState::Done => {
// TODO handle better
Pending
}
};
};
};
let nn = self.value_insert_futs.len();
if nn > INSERT_FUTS_LIM {
warn!("value_insert_futs len {nn}");
}
let ts_outer_2 = Instant::now();
self.stats.poll_time_all_dur(ts_outer_2.duration_since(ts_outer_1));
// TODO currently, this will never stop by itself
ret
}
}

View File

@@ -1,6 +1,6 @@
use crate::bsread::ChannelDescDecoded;
use crate::series::{Existence, SeriesId};
use crate::store::{CommonInsertQueue, CommonInsertQueueSender};
use crate::store::CommonInsertQueueSender;
use async_channel::{Receiver, Sender};
use err::Error;
use scylla::prepared_statement::PreparedStatement;
@@ -8,6 +8,7 @@ use scylla::Session as ScySession;
use std::sync::Arc;
use tokio_postgres::Client as PgClient;
#[allow(unused)]
pub struct RegisterJob {
desc: ChannelDescDecoded,
}
@@ -18,11 +19,13 @@ impl RegisterJob {
}
}
#[allow(unused)]
pub struct RegisterChannel {
tx: Sender<RegisterJob>,
rx: Receiver<RegisterJob>,
}
#[allow(unused)]
pub struct ChannelRegistry {
scy: Arc<ScySession>,
pg_client: Arc<PgClient>,