WIP parse and send 2M batched

This commit is contained in:
Dominik Werder
2023-11-16 17:07:13 +01:00
parent 298e9b4faa
commit e2d8f389b4
18 changed files with 576 additions and 343 deletions

View File

@@ -1,8 +1,8 @@
[build]
rustflags = [
#"-C", "target-cpu=native",
"-C", "target-cpu=sandybridge",
"-C", "force-frame-pointers=yes",
"-C", "target-cpu=skylake",
#"-C", "force-frame-pointers=yes",
#"-C", "force-unwind-tables=yes",
#"-C", "relocation-model=static",
#"-C", "embed-bitcode=no",

View File

@@ -4,7 +4,7 @@ resolver = "2"
[profile.release]
opt-level = 2
debug = 0
debug = 1
overflow-checks = false
debug-assertions = false
lto = "thin"

View File

@@ -4,8 +4,11 @@ version = "0.0.1"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[lib]
doctest = false
[dependencies]
log = { path = "../log" }
err = { path = "../../daqbuffer/crates/err" }
taskrun = { path = "../../daqbuffer/crates/taskrun" }
async-channel = "2.0.0"
async-channel = "2.1.0"

View File

@@ -0,0 +1,45 @@
use async_channel::Receiver;
use async_channel::Sender;
use std::time::Instant;
#[test]
fn prod_cons() {
let rt = taskrun::get_runtime();
rt.block_on(run());
}
async fn run() {
let n = 10_000_000;
let ts1 = Instant::now();
let (tx, rx) = async_channel::bounded(1000);
let mut jhs = Vec::new();
let jh = taskrun::spawn(consumer(rx.clone()));
jhs.push(jh);
let jh = taskrun::spawn(consumer(rx));
jhs.push(jh);
let jh = taskrun::spawn(producer(tx, n));
jhs.push(jh);
for jh in jhs {
jh.await.unwrap();
}
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1);
eprintln!("dt {:.3} MHz", n as f32 / dt.as_secs_f32() * 1e-6);
panic!();
}
async fn producer(tx: Sender<Item>, n: u64) {
for i in 0..n {
let item = Item { x: i, y: i };
tx.send(item).await.unwrap();
}
}
async fn consumer(rx: Receiver<Item>) {
while let Ok(_x) = rx.recv().await {}
}
struct Item {
x: u64,
y: u64,
}

View File

@@ -1 +1,3 @@
pub mod batcher;
#[cfg(test)]
pub mod channeltest;

View File

@@ -22,6 +22,7 @@ use scywriiq::QueryItem;
use stats::DaemonStats;
use stats::InsertWorkerStats;
use stats::SeriesByChannelStats;
use std::collections::VecDeque;
use std::sync::atomic;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicUsize;
@@ -42,29 +43,18 @@ enum CheckPeriodic {
}
pub struct DaemonOpts {
backend: String,
local_epics_hostname: String,
array_truncate: u64,
insert_item_queue_cap: usize,
pgconf: Database,
scyconf: ScyllaConfig,
ttls: Ttls,
#[allow(unused)]
test_bsread_addr: Option<String>,
insert_worker_count: usize,
insert_scylla_sessions: usize,
insert_frac: Arc<AtomicU64>,
store_workers_rate: Arc<AtomicU64>,
}
impl DaemonOpts {
pub fn backend(&self) -> &str {
&self.backend
}
}
pub struct Daemon {
opts: DaemonOpts,
ingest_opts: CaIngestOpts,
tx: Sender<DaemonEvent>,
rx: Receiver<DaemonEvent>,
insert_queue_counter: Arc<AtomicUsize>,
@@ -84,7 +74,7 @@ pub struct Daemon {
connset_status_last: CheckPeriodic,
// TODO should be a stats object?
insert_workers_running: AtomicU64,
query_item_tx_weak: WeakSender<QueryItem>,
query_item_tx_weak: WeakSender<VecDeque<QueryItem>>,
connset_health_lat_ema: f32,
}
@@ -101,20 +91,20 @@ impl Daemon {
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let (query_item_tx, query_item_rx) = async_channel::bounded(opts.insert_item_queue_cap);
let (query_item_tx, query_item_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap());
let query_item_tx_weak = query_item_tx.downgrade();
let insert_queue_counter = Arc::new(AtomicUsize::new(0));
// Insert queue hook
let query_item_rx = inserthook::active_channel_insert_hook(query_item_rx);
// let query_item_rx = inserthook::active_channel_insert_hook(query_item_rx);
let conn_set_ctrl = CaConnSet::start(
opts.backend.clone(),
opts.local_epics_hostname.clone(),
ingest_opts.backend().into(),
ingest_opts.local_epics_hostname(),
query_item_tx,
channel_info_query_tx,
ingest_opts,
ingest_opts.clone(),
);
// TODO remove
@@ -140,24 +130,44 @@ impl Daemon {
}
});
let use_rate_limit_queue = true;
// #[cfg(DISABLED)]
let query_item_rx = {
// TODO only testing, remove
tokio::spawn({
let rx = query_item_rx;
async move {
while let Ok(item) = rx.recv().await {
drop(item);
}
}
});
let (tx, rx) = async_channel::bounded(128);
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(2000)).await;
tx.len();
}
});
rx
};
let ttls = opts.ttls.clone();
let insert_worker_opts = InsertWorkerOpts {
store_workers_rate: opts.store_workers_rate.clone(),
insert_workers_running: Arc::new(AtomicU64::new(0)),
insert_frac: opts.insert_frac.clone(),
array_truncate: Arc::new(AtomicU64::new(opts.array_truncate)),
array_truncate: Arc::new(AtomicU64::new(ingest_opts.array_truncate())),
};
let insert_worker_opts = Arc::new(insert_worker_opts);
let insert_workers_jh = scywr::insertworker::spawn_scylla_insert_workers(
opts.scyconf.clone(),
opts.insert_scylla_sessions,
opts.insert_worker_count,
ingest_opts.insert_scylla_sessions(),
ingest_opts.insert_worker_count(),
ingest_opts.insert_worker_concurrency(),
query_item_rx,
insert_worker_opts,
insert_worker_stats.clone(),
use_rate_limit_queue,
ingest_opts.use_rate_limit_queue(),
ttls,
)
.await?;
@@ -199,6 +209,7 @@ impl Daemon {
let ret = Self {
opts,
ingest_opts,
tx: daemon_ev_tx,
rx: daemon_ev_rx,
insert_queue_counter,
@@ -230,7 +241,7 @@ impl Daemon {
async fn check_caconn_chans(&mut self, ts1: Instant) -> Result<(), Error> {
match &self.connset_status_last {
CheckPeriodic::Waiting(since) => {
if *since + Duration::from_millis(500) < ts1 {
if *since + Duration::from_millis(2000) < ts1 {
self.connset_ctrl.check_health().await?;
self.connset_status_last = CheckPeriodic::Ongoing(ts1);
}
@@ -297,9 +308,9 @@ impl Daemon {
// debug!("handle_channel_add {ch:?}");
self.connset_ctrl
.add_channel(
self.opts.backend.clone(),
self.ingest_opts.backend().into(),
ch.id().into(),
self.opts.local_epics_hostname.clone(),
self.ingest_opts.local_epics_hostname(),
)
.await?;
Ok(())
@@ -365,23 +376,23 @@ impl Daemon {
use CaConnSetItem::*;
match item {
Healthy(ts1, ts2) => {
let ts3 = Instant::now();
let dt1 = ts2.duration_since(ts1).as_secs_f32() * 1e3;
let dt2 = ts3.duration_since(ts2).as_secs_f32() * 1e3;
let tsnow = Instant::now();
let dt1 = tsnow.duration_since(ts1).as_secs_f32() * 1e3;
let dt2 = tsnow.duration_since(ts2).as_secs_f32() * 1e3;
match &self.connset_status_last {
CheckPeriodic::Waiting(_since) => {
error!("received CaConnSet health report without having asked {dt1:.0} ms {dt2:.0} ms");
}
CheckPeriodic::Ongoing(since) => {
// TODO insert response time as series to scylla.
let dtsince = ts3.duration_since(*since).as_secs_f32() * 1e6;
let dtsince = tsnow.duration_since(*since).as_secs_f32() * 1e3;
{
let v = &mut self.connset_health_lat_ema;
*v += (dtsince - *v) * 0.2;
self.stats.connset_health_lat_ema().set(*v as _);
}
// debug!("======================================== received CaConnSet healthy dtsince {dtsince:.0} ms {dt1:.0} ms {dt2:.0} ms");
self.connset_status_last = CheckPeriodic::Waiting(ts3);
debug!("received CaConnSet Healthy dtsince {dtsince:.0} ms {dt1:.0} ms {dt2:.0} ms");
self.connset_status_last = CheckPeriodic::Waiting(tsnow);
self.stats.caconnset_health_response().inc();
}
}
@@ -583,10 +594,6 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
let store_workers_rate = Arc::new(AtomicU64::new(opts.store_workers_rate()));
let opts2 = DaemonOpts {
backend: opts.backend().into(),
local_epics_hostname: opts.local_epics_hostname().into(),
array_truncate: opts.array_truncate(),
insert_item_queue_cap: opts.insert_item_queue_cap(),
pgconf: opts.postgresql_config().clone(),
scyconf: opts.scylla_config().clone(),
ttls: Ttls {
@@ -596,8 +603,6 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
binned: opts.ttl_binned(),
},
test_bsread_addr: opts.test_bsread_addr.clone(),
insert_worker_count: opts.insert_worker_count(),
insert_scylla_sessions: opts.insert_scylla_sessions(),
insert_frac: insert_frac.clone(),
store_workers_rate,
};

View File

@@ -1,14 +1,12 @@
use clap::ArgAction::Count;
use clap::Parser;
#[cfg(feature = "bsread")]
use ingest_bsread::zmtp::ZmtpClientOpts;
use std::net::SocketAddr;
#[derive(Debug, Parser)]
#[derive(Debug, clap::Parser)]
#[command(author, version, about)]
pub struct DaqIngestOpts {
#[arg(long, action(Count))]
pub verbose: u32,
#[arg(long, action(clap::ArgAction::Count))]
pub verbose: u8,
#[clap(long)]
pub tag: Option<String>,
#[command(subcommand)]
@@ -17,7 +15,7 @@ pub struct DaqIngestOpts {
pub nworkers: Option<usize>,
}
#[derive(Debug, Parser)]
#[derive(Debug, clap::Parser)]
pub enum SubCmd {
ListPkey,
ListPulses,
@@ -31,7 +29,7 @@ pub enum SubCmd {
Version,
}
#[derive(Debug, Parser)]
#[derive(Debug, clap::Parser)]
pub struct Bsread {
#[arg(long)]
pub backend: String,
@@ -61,7 +59,7 @@ impl From<Bsread> for ZmtpClientOpts {
}
}
#[derive(Debug, Parser)]
#[derive(Debug, clap::Parser)]
pub struct FetchEvents {
#[arg(long, num_args(1..))]
pub scylla: Vec<String>,
@@ -71,24 +69,24 @@ pub struct FetchEvents {
pub backend: String,
}
#[derive(Debug, Parser)]
#[derive(Debug, clap::Parser)]
pub struct BsreadDump {
pub source: String,
}
#[derive(Debug, Parser)]
#[derive(Debug, clap::Parser)]
pub enum ChannelAccess {
CaIngest(CaConfig),
#[cfg(DISABLED)]
CaSearch(CaSearch),
}
#[derive(Debug, Parser)]
#[derive(Debug, clap::Parser)]
pub struct CaSearch {
pub config: String,
}
#[derive(Debug, Parser)]
#[derive(Debug, clap::Parser)]
pub struct CaConfig {
pub config: String,
}

View File

@@ -20,6 +20,7 @@ pub fn local_hostname() -> String {
let hostname = CStr::from_ptr(&buf[0] as *const _ as _);
hostname.to_str().unwrap()
};
log::info!("---------------------- found hostname {hostname:?}");
hostname.into()
}

View File

@@ -41,11 +41,15 @@ use scywriiq::QueryItem;
use serde::Serialize;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use stats::rand_xoshiro::rand_core::RngCore;
use stats::rand_xoshiro::rand_core::SeedableRng;
use stats::rand_xoshiro::Xoshiro128StarStar;
use stats::CaConnStats;
use stats::CaProtoStats;
use stats::IntervalEma;
use stats::XorShift32;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::net::SocketAddrV4;
use std::ops::ControlFlow;
@@ -149,10 +153,10 @@ fn ser_instant<S: serde::Serializer>(val: &Option<Instant>, ser: S) -> Result<S:
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct Cid(pub u32);
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct Subid(pub u32);
#[derive(Clone, Debug)]
@@ -521,11 +525,11 @@ pub struct CaConn {
proto: Option<CaProto>,
cid_store: CidStore,
subid_store: SubidStore,
channels: BTreeMap<Cid, ChannelState>,
cid_by_name: BTreeMap<String, Cid>,
cid_by_subid: BTreeMap<Subid, Cid>,
name_by_cid: BTreeMap<Cid, String>,
time_binners: BTreeMap<Cid, ConnTimeBin>,
channels: HashMap<Cid, ChannelState>,
cid_by_name: HashMap<String, Cid>,
cid_by_subid: HashMap<Subid, Cid>,
name_by_cid: HashMap<Cid, String>,
time_binners: HashMap<Cid, ConnTimeBin>,
init_state_count: u64,
insert_item_queue: VecDeque<QueryItem>,
remote_addr_dbg: SocketAddrV4,
@@ -541,14 +545,14 @@ pub struct CaConn {
ioc_ping_last: Instant,
ioc_ping_next: Instant,
ioc_ping_start: Option<Instant>,
storage_insert_sender: Pin<Box<SenderPolling<QueryItem>>>,
storage_insert_sender: Pin<Box<SenderPolling<VecDeque<QueryItem>>>>,
ca_conn_event_out_queue: VecDeque<CaConnEvent>,
channel_info_query_queue: VecDeque<ChannelInfoQuery>,
channel_info_query_sending: Pin<Box<SenderPolling<ChannelInfoQuery>>>,
thr_msg_poll: ThrottleTrace,
ca_proto_stats: Arc<CaProtoStats>,
weird_count: usize,
rng: XorShift32,
rng: Xoshiro128StarStar,
}
#[cfg(DISABLED)]
@@ -564,13 +568,13 @@ impl CaConn {
backend: String,
remote_addr_dbg: SocketAddrV4,
local_epics_hostname: String,
storage_insert_tx: Sender<QueryItem>,
storage_insert_tx: Sender<VecDeque<QueryItem>>,
channel_info_query_tx: Sender<ChannelInfoQuery>,
stats: Arc<CaConnStats>,
ca_proto_stats: Arc<CaProtoStats>,
) -> Self {
let (cq_tx, cq_rx) = async_channel::bounded(32);
let mut rng = XorShift32::new_from_time();
let mut rng = stats::xoshiro_from_time();
Self {
opts,
backend,
@@ -580,16 +584,16 @@ impl CaConn {
cid_store: CidStore::new_from_time(),
subid_store: SubidStore::new_from_time(),
init_state_count: 0,
channels: BTreeMap::new(),
cid_by_name: BTreeMap::new(),
cid_by_subid: BTreeMap::new(),
name_by_cid: BTreeMap::new(),
time_binners: BTreeMap::new(),
channels: HashMap::new(),
cid_by_name: HashMap::new(),
cid_by_subid: HashMap::new(),
name_by_cid: HashMap::new(),
time_binners: HashMap::new(),
insert_item_queue: VecDeque::new(),
remote_addr_dbg,
local_epics_hostname,
stats,
insert_ivl_min_mus: 1000 * 6,
insert_ivl_min_mus: 1000 * 4,
conn_command_tx: Box::pin(cq_tx),
conn_command_rx: Box::pin(cq_rx),
conn_backoff: 0.02,
@@ -610,8 +614,8 @@ impl CaConn {
}
}
fn ioc_ping_ivl_rng(rng: &mut XorShift32) -> Duration {
IOC_PING_IVL * 100 / (70 + (rng.next() % 60))
fn ioc_ping_ivl_rng(rng: &mut Xoshiro128StarStar) -> Duration {
IOC_PING_IVL * 100 / (70 + (rng.next_u32() % 60))
}
fn new_self_ticker() -> Pin<Box<tokio::time::Sleep>> {
@@ -813,7 +817,7 @@ impl CaConn {
fn handle_conn_command(&mut self, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
// TODO if this loops for too long time, yield and make sure we get wake up again.
use Poll::*;
self.stats.caconn_loop3_count.inc();
self.stats.loop3_count.inc();
if self.is_shutdown() {
Ok(Ready(None))
} else {
@@ -896,11 +900,11 @@ impl CaConn {
fn channel_remove_expl(
name: String,
channels: &mut BTreeMap<Cid, ChannelState>,
cid_by_name: &mut BTreeMap<String, Cid>,
name_by_cid: &mut BTreeMap<Cid, String>,
channels: &mut HashMap<Cid, ChannelState>,
cid_by_name: &mut HashMap<String, Cid>,
name_by_cid: &mut HashMap<Cid, String>,
cid_store: &mut CidStore,
time_binners: &mut BTreeMap<Cid, ConnTimeBin>,
time_binners: &mut HashMap<Cid, ConnTimeBin>,
) {
let cid = Self::cid_by_name_expl(&name, cid_by_name, name_by_cid, cid_store);
if channels.contains_key(&cid) {
@@ -924,8 +928,8 @@ impl CaConn {
fn cid_by_name_expl(
name: &str,
cid_by_name: &mut BTreeMap<String, Cid>,
name_by_cid: &mut BTreeMap<Cid, String>,
cid_by_name: &mut HashMap<String, Cid>,
name_by_cid: &mut HashMap<Cid, String>,
cid_store: &mut CidStore,
) -> Cid {
if let Some(cid) = cid_by_name.get(name) {
@@ -1214,9 +1218,7 @@ impl CaConn {
let ema = em.ema();
let ivl_min = (insert_ivl_min_mus as f32) * 1e-6;
let dt = (ivl_min - ema).max(0.) / em.k();
st.insert_next_earliest = tsnow
.checked_add(Duration::from_micros((dt * 1e6) as u64))
.ok_or_else(|| Error::with_msg_no_trace("time overflow in next insert"))?;
st.insert_next_earliest = tsnow + Duration::from_micros((dt * 1e6) as u64);
let ts_msp_last = st.ts_msp_last;
// TODO get event timestamp from channel access field
let ts_msp_grid = (ts / TS_MSP_GRID_UNIT / TS_MSP_GRID_SPACING * TS_MSP_GRID_SPACING) as u32;
@@ -1555,7 +1557,7 @@ impl CaConn {
}
CaMsgTy::EventAddRes(k) => {
trace4!("got EventAddRes: {k:?}");
self.stats.caconn_recv_data.inc();
self.stats.event_add_res_recv.inc();
let res = Self::handle_event_add_res(self, k, tsnow);
let ts2 = Instant::now();
self.stats
@@ -1866,7 +1868,7 @@ impl CaConn {
let tsnow = Instant::now();
let mut have_progress = false;
for _ in 0..64 {
self.stats.caconn_loop2_count.inc();
self.stats.loop2_count.inc();
if self.is_shutdown() {
break;
} else if self.insert_item_queue.len() >= self.opts.insert_queue_max {
@@ -1963,21 +1965,50 @@ impl CaConn {
fn attempt_flush_storage_queue(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
use Poll::*;
let (qu, sd, stats) = Self::storage_queue_vars(&mut self);
{
let n = qu.len();
if n >= 128 {
stats.storage_queue_above_128().inc();
} else if n >= 32 {
stats.storage_queue_above_32().inc();
} else if n >= 8 {
stats.storage_queue_above_8().inc();
}
}
let mut have_progress = false;
for _ in 0..128 {
let sd = &mut self.storage_insert_sender;
let mut i = 0;
loop {
i += 1;
if i > 120 {
break;
}
if !sd.has_sender() {
return Err(Error::with_msg_no_trace("attempt_flush_storage_queue no more sender"));
}
if sd.is_idle() {
if let Some(item) = self.insert_item_queue.pop_front() {
self.storage_insert_sender.as_mut().send_pin(item);
if qu.len() != 0 {
let item: VecDeque<_> = qu.drain(..).collect();
stats.storage_queue_send().add(item.len() as _);
sd.as_mut().send_pin(item);
} else {
break;
}
}
if self.storage_insert_sender.is_sending() {
match self.storage_insert_sender.poll_unpin(cx) {
if sd.is_sending() {
match sd.poll_unpin(cx) {
Ready(Ok(())) => {
have_progress = true;
}
Ready(Err(_)) => return Err(Error::with_msg_no_trace("can not send into channel")),
Pending => return Ok(Pending),
Ready(Err(_)) => {
return Err(Error::with_msg_no_trace(
"attempt_flush_storage_queue can not send into channel",
));
}
Pending => {
stats.storage_queue_pending().inc();
return Ok(Pending);
}
}
}
}
@@ -1988,12 +2019,32 @@ impl CaConn {
}
}
// TODO refactor, put together in separate type:
fn storage_queue_vars(
this: &mut CaConn,
) -> (
&mut VecDeque<QueryItem>,
&mut Pin<Box<SenderPolling<VecDeque<QueryItem>>>>,
&CaConnStats,
) {
(
&mut this.insert_item_queue,
&mut this.storage_insert_sender,
&this.stats,
)
}
fn attempt_flush_channel_info_query(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
use Poll::*;
if self.is_shutdown() {
Ok(Ready(None))
} else {
let sd = self.channel_info_query_sending.as_mut();
if !sd.has_sender() {
return Err(Error::with_msg_no_trace(
"attempt_flush_channel_info_query no more sender",
));
}
if sd.is_idle() {
if let Some(item) = self.channel_info_query_queue.pop_front() {
trace3!("send series query {item:?}");
@@ -2005,7 +2056,9 @@ impl CaConn {
if sd.is_sending() {
match sd.poll_unpin(cx) {
Ready(Ok(())) => Ok(Ready(Some(()))),
Ready(Err(_)) => Err(Error::with_msg_no_trace("can not send into channel")),
Ready(Err(_)) => Err(Error::with_msg_no_trace(
"attempt_flush_channel_info_query can not send into channel",
)),
Pending => Ok(Pending),
}
} else {
@@ -2020,11 +2073,11 @@ impl Stream for CaConn {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
self.stats.caconn_poll_count.inc();
self.stats.poll_count().inc();
let poll_ts1 = Instant::now();
self.stats.ca_conn_poll_fn_begin().inc();
self.stats.poll_fn_begin().inc();
let ret = loop {
self.stats.ca_conn_poll_loop_begin().inc();
self.stats.poll_loop_begin().inc();
let qlen = self.insert_item_queue.len();
if qlen >= self.opts.insert_queue_max * 2 / 3 {
self.stats.insert_item_queue_pressure().inc();
@@ -2108,10 +2161,10 @@ impl Stream for CaConn {
} else {
// debug!("queues_out_flushed false");
if have_progress {
self.stats.ca_conn_poll_reloop().inc();
self.stats.poll_reloop().inc();
continue;
} else if have_pending {
self.stats.ca_conn_poll_pending().inc();
self.stats.poll_pending().inc();
Pending
} else {
// TODO error
@@ -2123,13 +2176,13 @@ impl Stream for CaConn {
}
} else {
if have_progress {
self.stats.ca_conn_poll_reloop().inc();
self.stats.poll_reloop().inc();
continue;
} else if have_pending {
self.stats.ca_conn_poll_pending().inc();
self.stats.poll_pending().inc();
Pending
} else {
self.stats.ca_conn_poll_no_progress_no_pending().inc();
self.stats.poll_no_progress_no_pending().inc();
let e = Error::with_msg_no_trace("no progress no pending");
Ready(Some(Err(e)))
}

View File

@@ -261,7 +261,13 @@ impl CaConnSetCtrl {
pub async fn check_health(&self) -> Result<(), Error> {
let cmd = ConnSetCmd::CheckHealth(Instant::now());
let n = self.tx.len();
if n > 0 {
debug!("check_health self.tx.len() {:?}", n);
}
let s = format!("{:?}", cmd);
self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?;
debug!("check_health enqueued {s}");
Ok(())
}
@@ -345,9 +351,9 @@ pub struct CaConnSet {
find_ioc_query_queue: VecDeque<IocAddrQuery>,
find_ioc_query_sender: Pin<Box<SenderPolling<IocAddrQuery>>>,
find_ioc_res_rx: Pin<Box<Receiver<VecDeque<FindIocRes>>>>,
storage_insert_tx: Pin<Box<Sender<QueryItem>>>,
storage_insert_queue: VecDeque<QueryItem>,
storage_insert_sender: Pin<Box<SenderPolling<QueryItem>>>,
storage_insert_tx: Pin<Box<Sender<VecDeque<QueryItem>>>>,
storage_insert_queue: VecDeque<VecDeque<QueryItem>>,
storage_insert_sender: Pin<Box<SenderPolling<VecDeque<QueryItem>>>>,
ca_conn_res_tx: Pin<Box<Sender<(SocketAddr, CaConnEvent)>>>,
ca_conn_res_rx: Pin<Box<Receiver<(SocketAddr, CaConnEvent)>>>,
connset_out_queue: VecDeque<CaConnSetItem>,
@@ -361,7 +367,6 @@ pub struct CaConnSet {
await_ca_conn_jhs: VecDeque<(SocketAddr, JoinHandle<Result<(), Error>>)>,
thr_msg_poll_1: ThrottleTrace,
thr_msg_storage_len: ThrottleTrace,
did_connset_out_queue: bool,
ca_proto_stats: Arc<CaProtoStats>,
rogue_channel_count: u64,
connect_fail_count: usize,
@@ -371,7 +376,7 @@ impl CaConnSet {
pub fn start(
backend: String,
local_epics_hostname: String,
storage_insert_tx: Sender<QueryItem>,
storage_insert_tx: Sender<VecDeque<QueryItem>>,
channel_info_query_tx: Sender<ChannelInfoQuery>,
ingest_opts: CaIngestOpts,
) -> CaConnSetCtrl {
@@ -422,7 +427,6 @@ impl CaConnSet {
await_ca_conn_jhs: VecDeque::new(),
thr_msg_poll_1: ThrottleTrace::new(Duration::from_millis(2000)),
thr_msg_storage_len: ThrottleTrace::new(Duration::from_millis(1000)),
did_connset_out_queue: false,
ca_proto_stats: ca_proto_stats.clone(),
rogue_channel_count: 0,
connect_fail_count: 0,
@@ -491,7 +495,8 @@ impl CaConnSet {
CaConnEventValue::EchoTimeout => Ok(()),
CaConnEventValue::ConnCommandResult(x) => self.handle_conn_command_result(addr, x),
CaConnEventValue::QueryItem(item) => {
self.storage_insert_queue.push_back(item);
todo!("remove this insert case");
// self.storage_insert_queue.push_back(item);
Ok(())
}
CaConnEventValue::ChannelCreateFail(x) => self.handle_channel_create_fail(addr, x),
@@ -743,7 +748,7 @@ impl CaConnSet {
}
fn handle_check_health(&mut self, ts1: Instant) -> Result<(), Error> {
debug!("handle_check_health");
trace2!("handle_check_health");
if self.shutdown_stopping {
return Ok(());
}
@@ -754,15 +759,11 @@ impl CaConnSet {
self.check_channel_states()?;
// Trigger already the next health check, but use the current data that we have.
// TODO try to deliver a command to CaConn
// Add some queue for commands to CaConn to the ress.
// Fail here if that queue gets too long.
// Try to push the commands periodically.
// TODO do the full check before sending the reply to daemon.
for (_, res) in self.ca_conn_ress.iter_mut() {
let item = ConnCommand::check_health();
res.cmd_queue.push_back(item);
debug!(
trace2!(
"handle_check_health pushed check command {:?} {:?}",
res.cmd_queue.len(),
res.sender.len()
@@ -822,7 +823,7 @@ impl CaConnSet {
}
fn apply_ca_conn_health_update(&mut self, addr: SocketAddr, res: CheckHealthResult) -> Result<(), Error> {
debug!("apply_ca_conn_health_update {addr}");
trace2!("apply_ca_conn_health_update {addr}");
let tsnow = SystemTime::now();
self.rogue_channel_count = 0;
for (k, v) in res.channel_statuses {
@@ -993,7 +994,7 @@ impl CaConnSet {
async fn ca_conn_item_merge(
conn: CaConn,
tx1: Sender<(SocketAddr, CaConnEvent)>,
tx2: Sender<QueryItem>,
tx2: Sender<VecDeque<QueryItem>>,
addr: SocketAddr,
stats: Arc<CaConnSetStats>,
) -> Result<(), Error> {
@@ -1005,10 +1006,13 @@ impl CaConnSet {
while let Some(item) = conn.next().await {
match item {
Ok(item) => {
connstats.conn_item_count.inc();
connstats.item_count.inc();
match item.value {
CaConnEventValue::QueryItem(x) => {
if let Err(_) = tx2.send(x).await {
warn!("ca_conn_item_merge should not go here often");
let mut v = VecDeque::new();
v.push_back(x);
if let Err(_) = tx2.send(v).await {
break;
}
}
@@ -1076,7 +1080,9 @@ impl CaConnSet {
};
}
let item = QueryItem::ChannelStatus(item);
self.storage_insert_queue.push_back(item);
let mut v = VecDeque::new();
v.push_back(item);
self.storage_insert_queue.push_back(v);
Ok(())
}
@@ -1423,35 +1429,37 @@ impl CaConnSet {
(search_pending, assigned_without_health_update)
}
fn try_push_ca_conn_cmds(&mut self, cx: &mut Context) {
fn try_push_ca_conn_cmds(&mut self, cx: &mut Context) -> Result<(), Error> {
use Poll::*;
for (_, v) in self.ca_conn_ress.iter_mut() {
'level2: loop {
let tx = &mut v.sender;
if v.cmd_queue.len() != 0 || tx.is_sending() {
debug!("try_push_ca_conn_cmds {:?} {:?}", v.cmd_queue.len(), tx.len());
let tx = &mut v.sender;
loop {
if false {
if v.cmd_queue.len() != 0 || tx.is_sending() {
debug!("try_push_ca_conn_cmds {:?} {:?}", v.cmd_queue.len(), tx.len());
}
}
loop {
break if tx.is_sending() {
match tx.poll_unpin(cx) {
Ready(Ok(())) => {
self.stats.try_push_ca_conn_cmds_sent.inc();
continue;
}
Ready(Err(e)) => {
error!("try_push_ca_conn_cmds {e}");
}
Pending => {
break 'level2;
}
break if tx.is_sending() {
match tx.poll_unpin(cx) {
Ready(Ok(())) => {
self.stats.try_push_ca_conn_cmds_sent.inc();
continue;
}
} else if let Some(item) = v.cmd_queue.pop_front() {
tx.as_mut().send_pin(item);
continue;
};
}
Ready(Err(e)) => {
error!("try_push_ca_conn_cmds {e}");
return Err(Error::with_msg_no_trace(format!("{e}")));
}
Pending => (),
}
} else if let Some(item) = v.cmd_queue.pop_front() {
tx.as_mut().send_pin(item);
continue;
} else {
()
};
}
}
Ok(())
}
}
@@ -1459,9 +1467,11 @@ impl Stream for CaConnSet {
type Item = CaConnSetItem;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
trace4!("CaConnSet poll begin");
use Poll::*;
self.stats.poll_fn_begin().inc();
loop {
let res = loop {
trace4!("CaConnSet poll loop");
self.stats.poll_loop_begin().inc();
self.stats.storage_insert_tx_len.set(self.storage_insert_tx.len() as _);
@@ -1485,15 +1495,12 @@ impl Stream for CaConnSet {
let mut have_pending = false;
let mut have_progress = false;
self.try_push_ca_conn_cmds(cx);
if let Err(e) = self.try_push_ca_conn_cmds(cx) {
break Ready(Some(CaConnSetItem::Error(e)));
}
if self.did_connset_out_queue {
self.did_connset_out_queue = false;
} else {
if let Some(item) = self.connset_out_queue.pop_front() {
self.did_connset_out_queue = true;
break Ready(Some(item));
}
if let Some(item) = self.connset_out_queue.pop_front() {
break Ready(Some(item));
}
if let Some((addr, jh)) = self.await_ca_conn_jhs.front_mut() {
@@ -1634,7 +1641,9 @@ impl Stream for CaConnSet {
}
Err(e) => break Ready(Some(CaConnSetItem::Error(e))),
},
Ready(None) => {}
Ready(None) => {
warn!("connset_inp_rx broken?")
}
Pending => {
have_pending = true;
}
@@ -1663,6 +1672,8 @@ impl Stream for CaConnSet {
}
}
};
}
};
trace4!("CaConnSet poll done");
res
}
}

View File

@@ -1,13 +1,12 @@
use crate::netbuf;
use err::thiserror;
use err::ThisError;
use futures_util::pin_mut;
use futures_util::Stream;
use log::*;
use netpod::timeunits::*;
use slidebuf::SlideBuf;
use stats::CaProtoStats;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::io;
use std::net::SocketAddrV4;
@@ -46,6 +45,8 @@ pub enum Error {
ParseAttemptInDoneState,
UnexpectedHeader,
ExtendedHeaderBadCount,
NoReadBufferSpace,
NeitherPendingNorProgress,
}
const CA_PROTO_VERSION: u16 = 13;
@@ -1016,8 +1017,9 @@ pub struct CaProto {
outbuf: SlideBuf,
out: VecDeque<CaMsg>,
array_truncate: usize,
logged_proto_error_for_cid: BTreeMap<u32, bool>,
logged_proto_error_for_cid: HashMap<u32, bool>,
stats: Arc<CaProtoStats>,
resqu: VecDeque<CaItem>,
}
impl CaProto {
@@ -1026,12 +1028,13 @@ impl CaProto {
tcp,
remote_addr_dbg,
state: CaState::StdHead,
buf: SlideBuf::new(1024 * 1024 * 4),
buf: SlideBuf::new(1024 * 1024 * 8),
outbuf: SlideBuf::new(1024 * 128),
out: VecDeque::new(),
array_truncate,
logged_proto_error_for_cid: BTreeMap::new(),
logged_proto_error_for_cid: HashMap::new(),
stats,
resqu: VecDeque::with_capacity(256),
}
}
@@ -1063,14 +1066,14 @@ impl CaProto {
}
}
fn attempt_output(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
fn attempt_output(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<usize, Error>> {
use Poll::*;
let (w, b) = self.outbuf_conn();
pin_mut!(w);
let w = Pin::new(w);
match w.poll_write(cx, b) {
Ready(k) => match k {
Ok(k) => match self.outbuf.adv(k) {
Ok(()) => Ready(Ok(())),
Ok(()) => Ready(Ok(k)),
Err(e) => {
error!("advance error {:?}", e);
Ready(Err(e.into()))
@@ -1085,13 +1088,12 @@ impl CaProto {
}
}
fn loop_body(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<Option<Poll<CaItem>>, Error> {
fn loop_body(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<Poll<()>, Error> {
use Poll::*;
let mut have_pending = false;
let mut have_progress = false;
let tsnow = Instant::now();
let output_res_1: Option<Poll<()>> = 'll1: loop {
if self.out.len() == 0 {
break None;
}
'l1: while self.out.len() != 0 {
while let Some((msg, buf)) = self.out_msg_buf() {
let msglen = msg.len();
if msglen > buf.len() {
@@ -1103,168 +1105,184 @@ impl CaProto {
self.out.pop_front();
}
}
while self.outbuf.len() > 0 {
while self.outbuf.len() != 0 {
match Self::attempt_output(self.as_mut(), cx)? {
Ready(()) => {}
Ready(n) => {
if n != 0 {
have_progress = true;
} else {
// Should not occur to begin with. TODO restructure.
break 'l1;
}
}
Pending => {
break 'll1 Some(Pending);
have_pending = true;
break 'l1;
}
}
}
};
let output_res_2: Option<Poll<()>> = if let Some(Pending) = output_res_1 {
Some(Pending)
} else {
loop {
if self.outbuf.len() == 0 {
break None;
}
'l1: while self.outbuf.len() != 0 {
match Self::attempt_output(self.as_mut(), cx)? {
Ready(n) => {
if n != 0 {
have_progress = true;
} else {
// Should not occur to begin with. TODO restructure.
break 'l1;
}
}
match Self::attempt_output(self.as_mut(), cx)? {
Ready(()) => {}
Pending => break Some(Pending),
Pending => {
have_pending = true;
break 'l1;
}
}
};
}
let need_min = self.state.need_min();
let read_res = {
if self.buf.cap() < need_min {
self.state = CaState::Done;
let e = Error::BufferTooSmallForNeedMin(self.buf.cap(), self.state.need_min());
Err(e)
} else if self.buf.len() < need_min {
let (w, mut rbuf) = self.inpbuf_conn(need_min)?;
pin_mut!(w);
match w.poll_read(cx, &mut rbuf) {
Ready(k) => match k {
Ok(()) => {
let nf = rbuf.filled().len();
if nf == 0 {
info!(
"EOF peer {:?} {:?} {:?}",
self.tcp.peer_addr(),
self.remote_addr_dbg,
self.state
);
// TODO may need another state, if not yet done when input is EOF.
self.state = CaState::Done;
Ok(Some(Ready(CaItem::empty())))
} else {
if false {
info!("received {} bytes", rbuf.filled().len());
let t = rbuf.filled().len().min(32);
info!("received data {:?}", &rbuf.filled()[0..t]);
if self.buf.cap() < need_min {
self.state = CaState::Done;
let e = Error::BufferTooSmallForNeedMin(self.buf.cap(), self.state.need_min());
return Err(e);
}
if self.buf.len() < need_min {
let (w, mut rbuf) = self.inpbuf_conn(need_min)?;
if rbuf.remaining() == 0 {
return Err(Error::NoReadBufferSpace);
}
let w = Pin::new(w);
match w.poll_read(cx, &mut rbuf) {
Ready(k) => match k {
Ok(()) => {
let nf = rbuf.filled().len();
if nf == 0 {
info!(
"EOF peer {:?} {:?} {:?}",
self.tcp.peer_addr(),
self.remote_addr_dbg,
self.state
);
// TODO may need another state, if not yet done when input is EOF.
self.state = CaState::Done;
} else {
if false {
info!("received {} bytes", rbuf.filled().len());
let t = rbuf.filled().len().min(32);
info!("received data {:?}", &rbuf.filled()[0..t]);
}
match self.buf.wadv(nf) {
Ok(()) => {
have_progress = true;
self.stats.tcp_recv_bytes().add(nf as _);
self.stats.tcp_recv_count().inc();
}
match self.buf.wadv(nf) {
Ok(()) => {
self.stats.tcp_recv_bytes().add(nf as _);
self.stats.tcp_recv_count().inc();
Ok(Some(Ready(CaItem::empty())))
}
Err(e) => {
error!("netbuf wadv fail nf {nf}");
Err(e.into())
}
Err(e) => {
error!("netbuf wadv fail nf {nf}");
return Err(e.into());
}
}
}
Err(e) => Err(e.into()),
},
Pending => Ok(Some(Pending)),
}
Err(e) => {
return Err(e.into());
}
},
Pending => {
have_pending = true;
}
}
}
while self.resqu.len() < self.resqu.capacity() {
if let Some(item) = self.parse_item(tsnow)? {
have_progress = true;
self.resqu.push_back(item);
} else {
Ok(None)
break;
}
}?;
let parse_res: Option<CaItem> = self.parse_item(tsnow)?;
match (output_res_2, read_res, parse_res) {
(_, _, Some(item)) => Ok(Some(Ready(item))),
(Some(Pending), _, _) => Ok(Some(Pending)),
(_, Some(Pending), _) => Ok(Some(Pending)),
(_, None, None) => {
// TODO constrain how often we can go to this case consecutively.
Ok(None)
}
(_, Some(_), None) => Ok(None),
}
if have_progress {
Ok(Ready(()))
} else if have_pending {
Ok(Pending)
} else {
Err(Error::NeitherPendingNorProgress)
}
}
fn parse_item(&mut self, tsnow: Instant) -> Result<Option<CaItem>, Error> {
loop {
if self.buf.len() < self.state.need_min() {
break Ok(None);
}
break match &self.state {
CaState::StdHead => {
let hi = HeadInfo::from_netbuf(&mut self.buf)?;
if hi.cmdid == 1 || hi.cmdid == 15 {
let sid = hi.param1;
if hi.payload_size == 0xffff {
if hi.data_count != 0 {
warn!("protocol error: {hi:?}");
return Err(Error::ExtendedHeaderBadCount);
}
if self.buf.len() < self.state.need_min() {
return Ok(None);
}
match &self.state {
CaState::StdHead => {
let hi = HeadInfo::from_netbuf(&mut self.buf)?;
if hi.cmdid == 1 || hi.cmdid == 15 {
let sid = hi.param1;
if hi.payload_size == 0xffff {
if hi.data_count != 0 {
warn!("protocol error: {hi:?}");
return Err(Error::ExtendedHeaderBadCount);
}
if hi.payload_size == 0xffff {
} else if hi.payload_size > 16368 {
self.stats.payload_std_too_large().inc();
}
}
if hi.cmdid > 26 {
// TODO count as logic error
self.stats.protocol_issue().inc();
}
if hi.payload_size == 0xffff {
self.state = CaState::ExtHead(hi);
Ok(None)
} else {
// For extended messages, ingest on receive of extended header
self.stats.payload_size().ingest(hi.payload_len() as u32);
if hi.payload_size == 0 {
self.state = CaState::StdHead;
let msg = CaMsg::from_proto_infos(&hi, &[], tsnow, self.array_truncate)?;
Ok(Some(CaItem::Msg(msg)))
} else {
self.state = CaState::Payload(hi);
Ok(None)
}
} else if hi.payload_size > 16368 {
self.stats.payload_std_too_large().inc();
}
}
CaState::ExtHead(hi) => {
let payload_size = self.buf.read_u32_be()?;
let data_count = self.buf.read_u32_be()?;
if hi.cmdid > 26 {
// TODO count as logic error
self.stats.protocol_issue().inc();
}
if hi.payload_size == 0xffff {
self.state = CaState::ExtHead(hi);
Ok(None)
} else {
// For extended messages, ingest on receive of extended header
self.stats.payload_size().ingest(hi.payload_len() as u32);
if payload_size > 1024 * 1024 * 32 {
self.stats.payload_ext_very_large().inc();
if false {
warn!(
"ExtHead data_type {} payload_size {payload_size} data_count {data_count}",
hi.data_type
);
}
if hi.payload_size == 0 {
self.state = CaState::StdHead;
let msg = CaMsg::from_proto_infos(&hi, &[], tsnow, self.array_truncate)?;
Ok(Some(CaItem::Msg(msg)))
} else {
self.state = CaState::Payload(hi);
Ok(None)
}
if payload_size <= 16368 {
self.stats.payload_ext_but_small().inc();
}
}
CaState::ExtHead(hi) => {
let payload_size = self.buf.read_u32_be()?;
let data_count = self.buf.read_u32_be()?;
self.stats.payload_size().ingest(hi.payload_len() as u32);
if payload_size > 1024 * 1024 * 32 {
self.stats.payload_ext_very_large().inc();
if false {
warn!(
"ExtHead data_type {} payload_size {payload_size} data_count {data_count}",
hi.data_type
);
}
let hi = hi.clone().with_ext(payload_size, data_count);
self.state = CaState::Payload(hi);
Ok(None)
}
CaState::Payload(hi) => {
let g = self.buf.read_bytes(hi.payload_len())?;
let msg = CaMsg::from_proto_infos(hi, g, tsnow, self.array_truncate)?;
// data-count is only reasonable for event messages
if let CaMsgTy::EventAddRes(e) = &msg.ty {
self.stats.data_count().ingest(hi.data_count() as u32);
}
self.state = CaState::StdHead;
Ok(Some(CaItem::Msg(msg)))
if payload_size <= 16368 {
self.stats.payload_ext_but_small().inc();
warn!(
"ExtHead data_type {} payload_size {payload_size} data_count {data_count}",
hi.data_type
);
}
CaState::Done => Err(Error::ParseAttemptInDoneState),
};
let hi = hi.clone().with_ext(payload_size, data_count);
self.state = CaState::Payload(hi);
Ok(None)
}
CaState::Payload(hi) => {
let g = self.buf.read_bytes(hi.payload_len())?;
let msg = CaMsg::from_proto_infos(hi, g, tsnow, self.array_truncate)?;
// data-count is only reasonable for event messages
if let CaMsgTy::EventAddRes(e) = &msg.ty {
self.stats.data_count().ingest(hi.data_count() as u32);
}
self.state = CaState::StdHead;
Ok(Some(CaItem::Msg(msg)))
}
CaState::Done => Err(Error::ParseAttemptInDoneState),
}
}
}
@@ -1274,16 +1292,16 @@ impl Stream for CaProto {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break if let CaState::Done = self.state {
break if let Some(item) = self.resqu.pop_front() {
Ready(Some(Ok(item)))
} else if let CaState::Done = self.state {
Ready(None)
} else {
let k = Self::loop_body(self.as_mut(), cx);
match k {
Ok(Some(Ready(k))) => Ready(Some(Ok(k))),
Ok(Some(Pending)) => Pending,
Ok(None) => continue,
Ok(Ready(())) => continue,
Ok(Pending) => Pending,
Err(e) => Ready(Some(Err(e))),
}
};

View File

@@ -28,6 +28,7 @@ pub struct CaIngestOpts {
scylla: ScyllaConfig,
array_truncate: Option<u64>,
insert_worker_count: Option<usize>,
insert_worker_concurrency: Option<usize>,
insert_scylla_sessions: Option<usize>,
insert_queue_max: Option<usize>,
insert_item_queue_cap: Option<usize>,
@@ -76,7 +77,11 @@ impl CaIngestOpts {
}
pub fn insert_worker_count(&self) -> usize {
self.insert_worker_count.unwrap_or(800)
self.insert_worker_count.unwrap_or(4)
}
pub fn insert_worker_concurrency(&self) -> usize {
self.insert_worker_concurrency.unwrap_or(32)
}
pub fn insert_scylla_sessions(&self) -> usize {

View File

@@ -223,6 +223,15 @@ fn make_routes(dcom: Arc<DaemonComm>, connset_cmd_tx: Sender<CaConnSetEvent>, st
|| async move {
axum::Json(serde_json::json!({
"v1": 42_u32,
"o1": {
"v2": 56,
"o2": {
"v3": "test",
},
},
"o5": {
"v6": 89,
},
}))
}
}),

View File

@@ -43,6 +43,10 @@ impl<T> SenderPolling<T> {
ret
}
pub fn has_sender(&self) -> bool {
self.sender.is_some()
}
pub fn is_idle(&self) -> bool {
self.sender.is_some() && self.fut.is_none()
}
@@ -97,6 +101,18 @@ impl<T> SenderPolling<T> {
}
self.sender.as_ref().unwrap().send(item).await
}
unsafe fn reset_fut(futopt: Pin<&mut Option<Send<'_, T>>>) {
let y = futopt.get_unchecked_mut();
let z = y.as_mut().unwrap_unchecked();
std::ptr::drop_in_place(z);
std::ptr::write(y, None);
}
#[allow(unused)]
unsafe fn reset_fut_old(futopt: Pin<&mut Option<Send<'_, T>>>) {
*futopt.get_unchecked_mut() = None;
}
}
impl<T> Future for SenderPolling<T>
@@ -109,16 +125,16 @@ where
use Poll::*;
let mut this = self.project();
match this.fut.as_mut().as_pin_mut() {
Some(fut) => match fut.poll(cx) {
Some(mut fut) => match fut.as_mut().poll(cx) {
Ready(Ok(())) => {
unsafe {
*this.fut.get_unchecked_mut() = None;
Self::reset_fut(this.fut);
}
Ready(Ok(()))
}
Ready(Err(e)) => {
unsafe {
*this.fut.get_unchecked_mut() = None;
Self::reset_fut(this.fut);
}
Ready(Err(Error::Closed(e.0)))
}

View File

@@ -98,6 +98,7 @@ pub async fn spawn_scylla_insert_workers(
scyconf: ScyllaConfig,
insert_scylla_sessions: usize,
insert_worker_count: usize,
insert_worker_concurrency: usize,
item_inp: Receiver<QueryItem>,
insert_worker_opts: Arc<InsertWorkerOpts>,
store_stats: Arc<stats::InsertWorkerStats>,
@@ -128,7 +129,7 @@ pub async fn spawn_scylla_insert_workers(
));
let jh = tokio::spawn(worker_streamed(
worker_ix,
insert_worker_count * 3,
insert_worker_concurrency,
item_inp.clone(),
ttls.clone(),
insert_worker_opts.clone(),
@@ -363,6 +364,10 @@ async fn worker_streamed(
})
.map(|x| futures_util::stream::iter(x))
.flatten_unordered(Some(1))
// .map(|x| async move {
// drop(x);
// Ok(())
// })
.buffer_unordered(concurrency);
while let Some(item) = stream.next().await {
match item {
@@ -408,23 +413,27 @@ fn prepare_query_insert_futs(
let series = item.series.clone();
let ts_msp = item.ts_msp;
let do_insert = true;
let fut = insert_item_fut(item, &ttls, &data_store, do_insert, stats);
let mut futs = smallvec![fut];
if msp_bump {
stats.inserts_msp().inc();
let fut = insert_msp_fut(
series,
ts_msp,
item_ts_local,
ttls,
data_store.scy.clone(),
data_store.qu_insert_ts_msp.clone(),
stats.clone(),
);
if item_ts_local % 100000 == 7461 {
let mut futs = smallvec![];
// TODO
if true || item_ts_local & 0x3f00000 < 0x0a00000 {
let fut = insert_item_fut(item, &ttls, &data_store, do_insert, stats);
futs.push(fut);
if msp_bump {
stats.inserts_msp().inc();
let fut = insert_msp_fut(
series,
ts_msp,
item_ts_local,
ttls,
data_store.scy.clone(),
data_store.qu_insert_ts_msp.clone(),
stats.clone(),
);
futs.push(fut);
}
}
#[cfg(DISABLED)]
if let Some(ts_msp_grid) = item.ts_msp_grid {
let params = (
@@ -441,5 +450,6 @@ fn prepare_query_insert_futs(
.await?;
stats.inserts_msp_grid().inc();
}
futs
}

View File

@@ -713,3 +713,37 @@ pub async fn insert_channel_status(
.await?;
Ok(())
}
pub struct InsertFut2 {
data_store: Arc<DataStore>,
stats: Arc<InsertWorkerStats>,
kind: InsertFutKind,
}
impl Future for InsertFut2 {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
todo!()
}
}
pub enum InsertFutKind {
Value,
}
pub struct InsertItemFut {
data_store: Arc<DataStore>,
stats: Arc<InsertWorkerStats>,
item: InsertItem,
}
impl Future for InsertItemFut {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
todo!()
}
}

View File

@@ -8,6 +8,7 @@ edition = "2021"
path = "src/stats.rs"
[dependencies]
rand_xoshiro = "0.6.0"
stats_types = { path = "../stats_types" }
stats_proc = { path = "../stats_proc" }
log = { path = "../log" }

View File

@@ -1,3 +1,5 @@
pub use rand_xoshiro;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Duration;
@@ -367,6 +369,7 @@ stats_proc::stats_struct!((
stats_proc::stats_struct!((
stats_struct(
name(CaConnStats),
prefix(caconn),
counters(
insert_item_create,
inserts_val,
@@ -383,35 +386,34 @@ stats_proc::stats_struct!((
// TODO maybe rename: this is now only the recv of the intermediate queue:
store_worker_item_recv,
// TODO rename to make clear that this drop is voluntary because of user config choice:
store_worker_fraction_drop,
store_worker_ratelimit_drop,
store_worker_insert_done,
store_worker_insert_binned_done,
store_worker_insert_overload,
store_worker_insert_timeout,
store_worker_insert_unavailable,
store_worker_insert_error,
// store_worker_fraction_drop,
// store_worker_ratelimit_drop,
// store_worker_insert_done,
// store_worker_insert_binned_done,
// store_worker_insert_overload,
// store_worker_insert_timeout,
// store_worker_insert_unavailable,
// store_worker_insert_error,
connection_status_insert_done,
channel_status_insert_done,
channel_info_insert_done,
ivl_insert_done,
mute_insert_done,
caconn_poll_count,
caconn_loop1_count,
caconn_loop2_count,
caconn_loop3_count,
caconn_loop4_count,
caconn_command_can_not_reply,
caconn_recv_data,
poll_count,
loop1_count,
loop2_count,
loop3_count,
loop4_count,
command_can_not_reply,
time_handle_conn_listen,
time_handle_peer_ready,
time_check_channels_state_init,
time_handle_event_add_res,
tcp_connected,
get_series_id_ok,
conn_item_count,
conn_stream_ready,
conn_stream_pending,
item_count,
stream_ready,
stream_pending,
channel_all_count,
channel_alive_count,
channel_not_alive_count,
@@ -419,11 +421,17 @@ stats_proc::stats_struct!((
ping_start,
ping_no_proto,
pong_timeout,
ca_conn_poll_fn_begin,
ca_conn_poll_loop_begin,
ca_conn_poll_reloop,
ca_conn_poll_pending,
ca_conn_poll_no_progress_no_pending,
poll_fn_begin,
poll_loop_begin,
poll_reloop,
poll_pending,
poll_no_progress_no_pending,
storage_queue_send,
storage_queue_pending,
storage_queue_above_8,
storage_queue_above_32,
storage_queue_above_128,
event_add_res_recv,
),
values(inter_ivl_ema),
histolog2s(pong_recv_lat, ca_ts_off,),
@@ -491,3 +499,17 @@ fn test0_diff() {
let diff = TestStats0Diff::diff_from(&stats_a, &stats_b);
assert_eq!(diff.count0.load(), 3);
}
pub fn xoshiro_from_time() -> rand_xoshiro::Xoshiro128StarStar {
use rand_xoshiro::rand_core::SeedableRng;
use std::time::SystemTime;
let a = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.subsec_nanos() as u64;
let b = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.subsec_nanos() as u64;
rand_xoshiro::Xoshiro128StarStar::seed_from_u64(a << 32 ^ b)
}