WIP factor conn

This commit is contained in:
Dominik Werder
2025-02-06 14:57:18 +01:00
parent cca3d77af9
commit 67a0d1f3ea
10 changed files with 232 additions and 42 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.2.6-aa.0"
version = "0.2.7-aa.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
@@ -24,6 +24,7 @@ err = { path = "../../daqbuf-err", package = "daqbuf-err" }
netpod = { path = "../../daqbuf-netpod", package = "daqbuf-netpod" }
taskrun = { path = "../../daqbuffer/crates/taskrun" }
series = { path = "../../daqbuf-series", package = "daqbuf-series" }
channeltools = { path = "../../daqbuf-channeltools", package = "daqbuf-channeltools" }
log = { path = "../log" }
stats = { path = "../stats" }
scywr = { path = "../scywr" }

View File

@@ -2,6 +2,7 @@ pub mod inserthook;
use async_channel::Receiver;
use async_channel::Sender;
use channeltools::channel_combine_ab::ChannelCombineAB;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::Error;
use log::*;
@@ -135,17 +136,20 @@ impl Daemon {
let (st_rf1_tx, st_rf1_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap());
let (mt_rf3_tx, mt_rf3_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap());
let (lt_rf3_tx, lt_rf3_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap());
let (lt_rf3_lat5_tx, lt_rf3_lat5_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap());
let iqtx = InsertQueuesTx {
st_rf3_tx,
st_rf1_tx,
mt_rf3_tx,
lt_rf3_tx,
lt_rf3_lat5_tx,
};
let iqrx = InsertQueuesRx {
st_rf3_rx,
st_rf1_rx,
mt_rf3_rx,
lt_rf3_rx,
lt_rf3_lat5_rx,
};
(iqtx, iqrx)
};
@@ -251,13 +255,15 @@ impl Daemon {
.map_err(Error::from_string)?;
insert_worker_jhs.extend(jh);
let lt_rx_combined = ChannelCombineAB::new(iqrx.lt_rf3_rx, iqrx.lt_rf3_lat5_rx);
let jh = scywr::insertworker::spawn_scylla_insert_workers(
RetentionTime::Long,
opts.scyconf_lt.clone(),
ingest_opts.insert_scylla_sessions(),
ingest_opts.insert_worker_count().min(2),
ingest_opts.insert_worker_concurrency().min(8),
iqrx.lt_rf3_rx,
lt_rx_combined,
insert_worker_opts.clone(),
insert_worker_stats.clone(),
ingest_opts.use_rate_limit_queue(),

View File

@@ -83,7 +83,7 @@ use tokio::net::TcpStream;
const CONNECTING_TIMEOUT: Duration = Duration::from_millis(1000 * 6);
const CHANNEL_STATUS_EMIT_IVL: Duration = Duration::from_millis(1000 * 8);
const IOC_PING_IVL: Duration = Duration::from_millis(1000 * 120);
const IOC_PING_IVL: Duration = Duration::from_millis(1000 * 180);
const MONITOR_POLL_TIMEOUT: Duration = Duration::from_millis(1000 * 6);
const TIMEOUT_CHANNEL_CLOSING: Duration = Duration::from_millis(1000 * 8);
const TIMEOUT_PONG_WAIT: Duration = Duration::from_millis(1000 * 10);
@@ -2084,16 +2084,27 @@ impl CaConn {
} else {
self.stats.recv_read_notify_state_read_pending.inc();
}
self.read_ioids.remove(&ioid);
let read_expected = if let Some(cid) = self.read_ioids.remove(&ioid) {
true
} else {
false
};
st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState {
tsbeg: tsnow,
ts_silence_read_next: tsnow + Self::silence_read_next_ivl_rng(&mut self.rng),
});
{
if read_expected {
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: st.channel.cssid.clone(),
status: ChannelStatus::MonitoringSilenceReadUnchanged,
status: ChannelStatus::MonitoringReadResultExpected,
};
ch_wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?;
} else {
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: st.channel.cssid.clone(),
status: ChannelStatus::MonitoringReadResultUnexpected,
};
ch_wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?;
}
@@ -2103,6 +2114,7 @@ impl CaConn {
// But there is still a small chance that the monitor will just received slightly later.
// More involved check would be to raise a flag, wait for the expected monitor for some
// timeout, and if we get nothing error out.
// TODO read-result-after-monitor-silence
if false {
Self::read_notify_res_for_write(
ev,
@@ -2204,14 +2216,17 @@ impl CaConn {
// TODO should attach these counters already to Writable state.
if crst.ts_recv_value_status_emit_next <= tsnow {
crst.ts_recv_value_status_emit_next = tsnow + Self::recv_value_status_emit_ivl_rng(rng);
let item = ChannelStatusItem {
ts: stnow,
cssid: crst.cssid,
status: ChannelStatus::MonitoringSilenceReadUnchanged,
};
let deque = &mut iqdqs.st_rf3_qu;
if wrst.emit_channel_status_item(item, deque).is_err() {
stats.logic_error().inc();
// TODO was only for debugging
if false {
let item = ChannelStatusItem {
ts: stnow,
cssid: crst.cssid,
status: ChannelStatus::MonitoringSilenceReadUnchanged,
};
let deque = &mut iqdqs.st_rf3_qu;
if wrst.emit_channel_status_item(item, deque).is_err() {
stats.logic_error().inc();
}
}
}
let tsev_local = TsNano::from_system_time(stnow);
@@ -3302,6 +3317,9 @@ macro_rules! flush_queue {
($self:expr, $qu:ident, $sp:ident, $batcher:expr, $loop_max:expr, $have:expr, $id:expr, $cx:expr, $stats:expr) => {
let obj = $self.as_mut().get_mut();
let qu = &mut obj.$qu;
if qu.len() < qu.capacity() * 4 / 10 {
qu.shrink_to(qu.capacity() * 7 / 10);
}
let sp = obj.$sp.as_mut();
match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id, $stats) {
Ok(Ready(Some(()))) => {
@@ -3465,6 +3483,22 @@ impl Stream for CaConn {
cx,
stats_fn
);
let stats2 = self.stats.clone();
let stats_fn = move |item: &VecDeque<QueryItem>| {
stats2.iiq_batch_len().ingest(item.len() as u32);
};
flush_queue_dqs!(
self,
lt_rf3_lat5_qu,
lt_rf3_lat5_sp_pin,
send_batched::<256, _>,
32,
(&mut have_progress, &mut have_pending),
"lt_rf3_lat5_rx",
cx,
stats_fn
);
}
let lts3 = Instant::now();

View File

@@ -3,3 +3,4 @@ mod channelstateinfo;
mod conn;
mod conncmd;
mod connevent;
mod progpend;

View File

@@ -1,9 +1,13 @@
mod connecting;
use super::conncmd::ConnCommand;
use super::connevent::CaConnEvent;
use super::connevent::EndOfStreamReason;
use crate::ca::conn::CaConnOpts;
use crate::ca::conn2::progpend::HaveProgressPending;
use async_channel::Sender;
use ca_proto::ca::proto;
use connecting::Connecting;
use dbpg::seriesbychannel::ChannelInfoQuery;
use futures_util::Future;
use futures_util::FutureExt;
@@ -19,6 +23,7 @@ use stats::rand_xoshiro::Xoshiro128PlusPlus;
use stats::CaConnStats;
use stats::CaProtoStats;
use std::collections::VecDeque;
use std::fmt;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::sync::Arc;
@@ -29,6 +34,13 @@ use std::time::Instant;
use taskrun::tokio;
use tokio::net::TcpStream;
autoerr::create_error_v1!(
name(Error, "Conn"),
enum variants {
TickerPoll,
},
);
struct DurationMeasureSteps {
ts: Instant,
durs: smallvec::SmallVec<[Duration; 8]>,
@@ -50,22 +62,15 @@ impl DurationMeasureSteps {
}
}
#[derive(Debug)]
pub enum Error {
TickerPoll,
}
type ConnectingFut =
Pin<Box<dyn Future<Output = Result<Result<TcpStream, std::io::Error>, tokio::time::error::Elapsed>> + Send>>;
enum ConnectedState {
Init(CaProto),
Handshake(CaProto),
PeerReady(CaProto),
}
#[derive(Debug)]
enum CaConnState {
Connecting(Instant, SocketAddrV4, ConnectingFut),
Connecting(Connecting),
Connected(CaProto),
Shutdown(EndOfStreamReason),
Done,
@@ -95,11 +100,11 @@ impl CaConn {
) -> Self {
let tsnow = Instant::now();
let (cq_tx, cq_rx) = async_channel::bounded::<ConnCommand>(32);
let mut rng = stats::xoshiro_from_time();
let rng = stats::xoshiro_from_time();
Self {
opts,
backend,
state: CaConnState::Connecting(tsnow, remote_addr, err::todoval()),
state: CaConnState::Connecting(Connecting::dummy_new(remote_addr, tsnow)),
iqdqs: InsertDeques::new(),
ca_conn_event_out_queue: VecDeque::new(),
ca_conn_event_out_queue_max: 2000,
@@ -120,6 +125,29 @@ impl CaConn {
}
}
macro_rules! handle_poll_res {
($res:expr, $hpp:expr) => {
match $res {
Ready(x) => match x {
Ok(x) => match x {
Some(x) => {
$hpp.have_progress();
}
None => {}
},
Err(e) => {
// TODO how to handle error:
// Transition state, emit item.
error!("{}", e);
}
},
Pending => {
$hpp.have_pending();
}
}
};
}
impl Stream for CaConn {
type Item = CaConnEvent;
@@ -129,17 +157,14 @@ impl Stream for CaConn {
self.stats.poll_fn_begin().inc();
let ret = loop {
self.stats.poll_loop_begin().inc();
let qlen = self.iqdqs.len();
if qlen >= self.opts.insert_queue_max * 2 / 3 {
self.stats.insert_item_queue_pressure().inc();
} else if qlen >= self.opts.insert_queue_max {
self.stats.insert_item_queue_full().inc();
}
let mut have_pending = false;
let mut have_progress = false;
let mut hppv = HaveProgressPending::new();
let hpp = &mut hppv;
if let CaConnState::Done = self.state {
break Ready(None);
} else if let Some(item) = self.ca_conn_event_out_queue.pop_front() {
@@ -149,10 +174,10 @@ impl Stream for CaConn {
// TODO add up duration of this scope
match self.as_mut().poll_own_ticker(cx) {
Ok(Ready(())) => {
have_progress = true;
hpp.have_progress();
}
Ok(Pending) => {
have_pending = true;
hpp.have_pending();
}
Err(e) => {
self.shutdown_on_error(e);
@@ -282,8 +307,8 @@ impl Stream for CaConn {
// }
// }
break match self.state {
CaConnState::Connecting(_, _, _) => todo!(),
match &mut self.state {
CaConnState::Connecting(st2) => handle_poll_res!(st2.poll_unpin(cx), hpp),
CaConnState::Connected(_) => todo!(),
CaConnState::Shutdown(_) => {
// TODO still attempt to flush queues.
@@ -291,7 +316,7 @@ impl Stream for CaConn {
todo!()
}
CaConnState::Done => todo!(),
};
}
// break if self.is_shutdown() {
// if self.queues_out_flushed() {
@@ -340,6 +365,14 @@ impl Stream for CaConn {
// Ready(Some(CaConnEvent::err_now(e)))
// }
// };
break if hpp.is_progress() {
continue;
} else if hpp.is_pending() {
Pending
} else {
Ready(None)
};
};
durs.step();

View File

@@ -0,0 +1,53 @@
use std::fmt;
use std::future::Future;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Instant;
use taskrun::tokio;
use tokio::net::TcpStream;
autoerr::create_error_v1!(
name(Error, "Connecting"),
enum variants {
Logic,
},
);
type ConnectingFut =
Pin<Box<dyn Future<Output = Result<Result<TcpStream, std::io::Error>, tokio::time::error::Elapsed>> + Send>>;
pub struct Connecting {
tsbeg: Instant,
addr: SocketAddrV4,
fut: ConnectingFut,
}
impl Connecting {
pub fn dummy_new(remote_addr: SocketAddrV4, tsnow: Instant) -> Self {
Self {
tsbeg: tsnow,
addr: remote_addr,
fut: err::todoval(),
}
}
pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<Option<()>, Error>> {
let x = Err(Error::Logic);
x?
}
pub fn poll_unpin(&mut self, cx: &mut Context) -> Poll<Result<Option<()>, Error>> {
Pin::new(self).poll(cx)
}
}
impl fmt::Debug for Connecting {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Connecting")
.field("tsbeg", &self.tsbeg)
.field("addr", &self.addr)
.finish()
}
}

View File

@@ -0,0 +1,29 @@
pub struct HaveProgressPending {
have_progress: bool,
have_pending: bool,
}
impl HaveProgressPending {
pub fn new() -> Self {
Self {
have_progress: false,
have_pending: false,
}
}
pub fn have_progress(&mut self) {
self.have_progress = true;
}
pub fn have_pending(&mut self) {
self.have_pending = true;
}
pub fn is_progress(&self) -> bool {
self.have_progress
}
pub fn is_pending(&self) -> bool {
self.have_pending
}
}

View File

@@ -107,7 +107,7 @@ impl CaIngestOpts {
}
pub fn insert_item_queue_cap(&self) -> usize {
self.insert_item_queue_cap.unwrap_or(1000 * 1000)
self.insert_item_queue_cap.unwrap_or(1000 * 1000) * 2
}
pub fn store_workers_rate(&self) -> u64 {

View File

@@ -24,13 +24,13 @@ pub struct InsertQueuesTx {
pub st_rf3_tx: Sender<VecDeque<QueryItem>>,
pub mt_rf3_tx: Sender<VecDeque<QueryItem>>,
pub lt_rf3_tx: Sender<VecDeque<QueryItem>>,
pub lt_rf3_lat5_tx: Sender<VecDeque<QueryItem>>,
}
impl InsertQueuesTx {
/// Send all accumulated batches
pub async fn send_all(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
// Send each buffer down the corresponding channel
if false {
{
let item = core::mem::replace(&mut iqdqs.st_rf1_qu, VecDeque::new());
self.st_rf1_tx
.send(item)
@@ -58,6 +58,13 @@ impl InsertQueuesTx {
.await
.map_err(|_| Error::ChannelSend(RetentionTime::Long, 3))?;
}
{
let item = core::mem::replace(&mut iqdqs.lt_rf3_lat5_qu, VecDeque::new());
self.lt_rf3_tx
.send(item)
.await
.map_err(|_| Error::ChannelSend(RetentionTime::Long, 3))?;
}
Ok(())
}
@@ -79,7 +86,7 @@ impl<'a> fmt::Display for InsertQueuesTxSummary<'a> {
let obj = self.obj;
write!(
fmt,
"InsertQueuesTx {{ st_rf1_tx: {} {} {}, st_rf3_tx: {} {} {}, mt_rf3_tx: {} {} {}, lt_rf3_tx: {} {} {} }}",
"InsertQueuesTx {{ st_rf1_tx: {} {} {}, st_rf3_tx: {} {} {}, mt_rf3_tx: {} {} {}, lt_rf3_tx: {} {} {}, lt_rf3_lat5_tx: {} {} {} }}",
obj.st_rf1_tx.is_closed(),
obj.st_rf1_tx.is_full(),
obj.st_rf1_tx.len(),
@@ -92,6 +99,9 @@ impl<'a> fmt::Display for InsertQueuesTxSummary<'a> {
obj.lt_rf3_tx.is_closed(),
obj.lt_rf3_tx.is_full(),
obj.lt_rf3_tx.len(),
obj.lt_rf3_lat5_tx.is_closed(),
obj.lt_rf3_lat5_tx.is_full(),
obj.lt_rf3_lat5_tx.len(),
)
}
}
@@ -102,6 +112,7 @@ pub struct InsertQueuesRx {
pub st_rf3_rx: Receiver<VecDeque<QueryItem>>,
pub mt_rf3_rx: Receiver<VecDeque<QueryItem>>,
pub lt_rf3_rx: Receiver<VecDeque<QueryItem>>,
pub lt_rf3_lat5_rx: Receiver<VecDeque<QueryItem>>,
}
pub struct InsertDeques {
@@ -109,6 +120,7 @@ pub struct InsertDeques {
pub st_rf3_qu: VecDeque<QueryItem>,
pub mt_rf3_qu: VecDeque<QueryItem>,
pub lt_rf3_qu: VecDeque<QueryItem>,
pub lt_rf3_lat5_qu: VecDeque<QueryItem>,
}
impl InsertDeques {
@@ -118,12 +130,17 @@ impl InsertDeques {
st_rf3_qu: VecDeque::new(),
mt_rf3_qu: VecDeque::new(),
lt_rf3_qu: VecDeque::new(),
lt_rf3_lat5_qu: VecDeque::new(),
}
}
/// Total number of items cumulated over all queues.
pub fn len(&self) -> usize {
self.st_rf1_qu.len() + self.st_rf3_qu.len() + self.mt_rf3_qu.len() + self.lt_rf3_qu.len()
self.st_rf1_qu.len()
+ self.st_rf3_qu.len()
+ self.mt_rf3_qu.len()
+ self.lt_rf3_qu.len()
+ self.lt_rf3_lat5_qu.len()
}
pub fn clear(&mut self) {
@@ -131,6 +148,7 @@ impl InsertDeques {
self.st_rf3_qu.clear();
self.mt_rf3_qu.clear();
self.lt_rf3_qu.clear();
self.lt_rf3_lat5_qu.clear();
}
pub fn summary(&self) -> InsertDequesSummary {
@@ -166,6 +184,7 @@ impl InsertDeques {
&mut self.st_rf3_qu,
&mut self.mt_rf3_qu,
&mut self.lt_rf3_qu,
&mut self.lt_rf3_lat5_qu,
];
for qu in qus {
if qu.len() * 2 < qu.capacity() {
@@ -203,6 +222,8 @@ pub struct InsertSenderPolling {
pub mt_rf3_sp: SenderPolling<VecDeque<QueryItem>>,
#[pin]
pub lt_rf3_sp: SenderPolling<VecDeque<QueryItem>>,
#[pin]
pub lt_rf3_lat5_sp: SenderPolling<VecDeque<QueryItem>>,
}
impl InsertSenderPolling {
@@ -212,6 +233,7 @@ impl InsertSenderPolling {
st_rf3_sp: SenderPolling::new(iqtx.st_rf3_tx),
mt_rf3_sp: SenderPolling::new(iqtx.mt_rf3_tx),
lt_rf3_sp: SenderPolling::new(iqtx.lt_rf3_tx),
lt_rf3_lat5_sp: SenderPolling::new(iqtx.lt_rf3_lat5_tx),
}
}
@@ -235,6 +257,10 @@ impl InsertSenderPolling {
self.project().lt_rf3_sp
}
pub fn lt_rf3_lat5_sp_pin(self: Pin<&mut Self>) -> Pin<&mut SenderPolling<VecDeque<QueryItem>>> {
self.project().lt_rf3_lat5_sp
}
pub fn __st_rf1_sp_pin(self: Pin<&mut Self>) -> Pin<&mut SenderPolling<VecDeque<QueryItem>>> {
if true {
panic!("encapsulated by pin_project");
@@ -256,7 +282,7 @@ impl<'a> fmt::Display for InsertSenderPollingSummary<'a> {
let obj = self.obj;
write!(
fmt,
"InsertSenderPolling {{ st_rf1_idle_len: {:?} {:?}, st_rf3_idle_len: {:?} {:?}, mt_rf3_idle_len: {:?} {:?}, lt_rf3_idle_len: {:?} {:?} }}",
"InsertSenderPolling {{ st_rf1_idle_len: {:?} {:?}, st_rf3_idle_len: {:?} {:?}, mt_rf3_idle_len: {:?} {:?}, lt_rf3_idle_len: {:?} {:?}, lt_rf3_lat5_idle_len: {:?} {:?} }}",
obj.st_rf1_sp.is_idle(),
obj.st_rf1_sp.len(),
obj.st_rf3_sp.is_idle(),
@@ -265,6 +291,8 @@ impl<'a> fmt::Display for InsertSenderPollingSummary<'a> {
obj.mt_rf3_sp.len(),
obj.lt_rf3_sp.is_idle(),
obj.lt_rf3_sp.len(),
obj.lt_rf3_lat5_sp.is_idle(),
obj.lt_rf3_lat5_sp.len(),
)
}
}

View File

@@ -26,7 +26,8 @@ macro_rules! trace_ingest { ($($arg:tt)*) => ( if false { trace!($($arg)*); } )
macro_rules! trace_tick { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_tick_verbose { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! debug_bin2 { ($t:expr, $($arg:tt)*) => ( if $t { debug!($($arg)*); } ) }
macro_rules! debug_bin2 { ($t:expr, $($arg:tt)*) => ( if true { if $t { debug!($($arg)*); } } ) }
macro_rules! trace_bin2 { ($t:expr, $($arg:tt)*) => ( if false { if $t { trace!($($arg)*); } } ) }
autoerr::create_error_v1!(
name(Error, "SerieswriterBinwriter"),
@@ -101,7 +102,11 @@ impl BinWriter {
if let Some(last) = combs.last_mut() {
if last.1 >= DtMs::from_ms_u64(1000 * 60 * 60 * 24) {
last.0 = RetentionTime::Long;
} else if last.1 >= DtMs::from_ms_u64(1000 * 60 * 60 * 1) {
last.0 = RetentionTime::Long;
combs.push((RetentionTime::Long, DtMs::from_ms_u64(1000 * 60 * 60 * 24)));
} else {
combs.push((RetentionTime::Long, DtMs::from_ms_u64(1000 * 60 * 60 * 1)));
combs.push((RetentionTime::Long, DtMs::from_ms_u64(1000 * 60 * 60 * 24)));
}
}