Group flush of insert queues
This commit is contained in:
@@ -4,7 +4,6 @@ use super::proto::ReadNotify;
|
||||
use crate::ca::proto::ChannelClose;
|
||||
use crate::ca::proto::EventCancel;
|
||||
use crate::conf::ChannelConfig;
|
||||
use crate::senderpolling::SenderPolling;
|
||||
use crate::throttletrace::ThrottleTrace;
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
@@ -31,11 +30,14 @@ use proto::CaProto;
|
||||
use proto::CreateChan;
|
||||
use proto::EventAdd;
|
||||
use scywr::insertqueues::InsertDeques;
|
||||
use scywr::insertqueues::InsertQueuesTx;
|
||||
use scywr::insertqueues::InsertSenderPolling;
|
||||
use scywr::iteminsertqueue as scywriiq;
|
||||
use scywr::iteminsertqueue::Accounting;
|
||||
use scywr::iteminsertqueue::DataValue;
|
||||
use scywr::iteminsertqueue::QueryItem;
|
||||
use scywr::iteminsertqueue::ShutdownReason;
|
||||
use scywr::senderpolling::SenderPolling;
|
||||
use scywriiq::ChannelStatus;
|
||||
use scywriiq::ChannelStatusClosedReason;
|
||||
use scywriiq::ChannelStatusItem;
|
||||
@@ -770,7 +772,7 @@ pub struct CaConn {
|
||||
ioc_ping_last: Instant,
|
||||
ioc_ping_next: Instant,
|
||||
ioc_ping_start: Option<Instant>,
|
||||
storage_insert_sender: Pin<Box<SenderPolling<VecDeque<QueryItem>>>>,
|
||||
iqsp: Pin<Box<InsertSenderPolling>>,
|
||||
ca_conn_event_out_queue: VecDeque<CaConnEvent>,
|
||||
ca_conn_event_out_queue_max: usize,
|
||||
thr_msg_poll: ThrottleTrace,
|
||||
@@ -799,7 +801,7 @@ impl CaConn {
|
||||
backend: String,
|
||||
remote_addr_dbg: SocketAddrV4,
|
||||
local_epics_hostname: String,
|
||||
storage_insert_tx: Sender<VecDeque<QueryItem>>,
|
||||
iqtx: InsertQueuesTx,
|
||||
channel_info_query_tx: Sender<ChannelInfoQuery>,
|
||||
stats: Arc<CaConnStats>,
|
||||
ca_proto_stats: Arc<CaProtoStats>,
|
||||
@@ -836,7 +838,7 @@ impl CaConn {
|
||||
ioc_ping_last: tsnow,
|
||||
ioc_ping_next: tsnow + Self::ioc_ping_ivl_rng(&mut rng),
|
||||
ioc_ping_start: None,
|
||||
storage_insert_sender: Box::pin(SenderPolling::new(storage_insert_tx)),
|
||||
iqsp: Box::pin(InsertSenderPolling::new(iqtx)),
|
||||
ca_conn_event_out_queue: VecDeque::new(),
|
||||
ca_conn_event_out_queue_max: 2000,
|
||||
thr_msg_poll: ThrottleTrace::new(Duration::from_millis(10000)),
|
||||
@@ -2398,15 +2400,16 @@ impl CaConn {
|
||||
debug!(
|
||||
"async out flushed iiq {} {} caout {}",
|
||||
self.iqdqs.len() == 0,
|
||||
self.storage_insert_sender.is_idle(),
|
||||
self.iqsp.is_idle(),
|
||||
self.ca_conn_event_out_queue.is_empty()
|
||||
);
|
||||
self.iqdqs.len() == 0 && self.storage_insert_sender.is_idle() && self.ca_conn_event_out_queue.is_empty()
|
||||
self.iqdqs.len() == 0 && self.iqsp.is_idle() && self.ca_conn_event_out_queue.is_empty()
|
||||
}
|
||||
|
||||
fn attempt_flush_queue<T, Q, FB, FS>(
|
||||
qu: &mut VecDeque<T>,
|
||||
sp: &mut Pin<Box<SenderPolling<Q>>>,
|
||||
// sp: &mut Pin<Box<SenderPolling<Q>>>,
|
||||
mut sp: Pin<&mut SenderPolling<Q>>,
|
||||
qu_to_si: FB,
|
||||
loop_max: u32,
|
||||
cx: &mut Context,
|
||||
@@ -2448,7 +2451,7 @@ impl CaConn {
|
||||
have_progress = true;
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
use crate::senderpolling::Error as SpErr;
|
||||
use scywr::senderpolling::Error as SpErr;
|
||||
match e {
|
||||
SpErr::NoSendInProgress => return Err(Error::NotSending),
|
||||
SpErr::Closed(_) => return Err(Error::ClosedSending),
|
||||
@@ -2475,7 +2478,7 @@ macro_rules! flush_queue {
|
||||
($self:expr, $qu:ident, $sp:ident, $batcher:expr, $loop_max:expr, $have:expr, $id:expr, $cx:expr, $stats:expr) => {
|
||||
let obj = $self.as_mut().get_mut();
|
||||
let qu = &mut obj.$qu;
|
||||
let sp = &mut obj.$sp;
|
||||
let sp = obj.$sp.as_mut();
|
||||
match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id, $stats) {
|
||||
Ok(Ready(Some(()))) => {
|
||||
*$have.0 |= true;
|
||||
@@ -2493,7 +2496,11 @@ macro_rules! flush_queue_dqs {
|
||||
($self:expr, $qu:ident, $sp:ident, $batcher:expr, $loop_max:expr, $have:expr, $id:expr, $cx:expr, $stats:expr) => {
|
||||
let obj = $self.as_mut().get_mut();
|
||||
let qu = &mut obj.iqdqs.$qu;
|
||||
let sp = &mut obj.$sp;
|
||||
// let sp = std::pin::pin!(obj.iqsp.$sp);
|
||||
// let sp = &mut obj.iqsp.$sp;
|
||||
// let sp = std::pin::pin!(sp);
|
||||
// let sp = todo!();
|
||||
let sp = obj.iqsp.as_mut().$sp();
|
||||
match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id, $stats) {
|
||||
Ok(Ready(Some(()))) => {
|
||||
*$have.0 |= true;
|
||||
@@ -2577,11 +2584,41 @@ impl Stream for CaConn {
|
||||
flush_queue_dqs!(
|
||||
self,
|
||||
st_rf1_rx,
|
||||
storage_insert_sender,
|
||||
st_rf1_sp_pin,
|
||||
send_batched::<256, _>,
|
||||
32,
|
||||
(&mut have_progress, &mut have_pending),
|
||||
"iq_st_rf1",
|
||||
"st_rf1_rx",
|
||||
cx,
|
||||
stats_fn
|
||||
);
|
||||
let stats2 = self.stats.clone();
|
||||
let stats_fn = move |item: &VecDeque<QueryItem>| {
|
||||
stats2.iiq_batch_len().ingest(item.len() as u32);
|
||||
};
|
||||
flush_queue_dqs!(
|
||||
self,
|
||||
st_rf3_rx,
|
||||
st_rf3_sp_pin,
|
||||
send_batched::<256, _>,
|
||||
32,
|
||||
(&mut have_progress, &mut have_pending),
|
||||
"st_rf3_rx",
|
||||
cx,
|
||||
stats_fn
|
||||
);
|
||||
let stats2 = self.stats.clone();
|
||||
let stats_fn = move |item: &VecDeque<QueryItem>| {
|
||||
stats2.iiq_batch_len().ingest(item.len() as u32);
|
||||
};
|
||||
flush_queue_dqs!(
|
||||
self,
|
||||
mt_rf3_rx,
|
||||
mt_rf3_sp_pin,
|
||||
send_batched::<256, _>,
|
||||
32,
|
||||
(&mut have_progress, &mut have_pending),
|
||||
"mt_rf3_rx",
|
||||
cx,
|
||||
stats_fn
|
||||
);
|
||||
|
||||
@@ -10,7 +10,6 @@ use crate::conf::ChannelConfig;
|
||||
use crate::daemon_common::Channel;
|
||||
use crate::errconv::ErrConv;
|
||||
use crate::rt::JoinHandle;
|
||||
use crate::senderpolling::SenderPolling;
|
||||
use crate::throttletrace::ThrottleTrace;
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
@@ -38,6 +37,7 @@ use netpod::SeriesKind;
|
||||
use netpod::Shape;
|
||||
use scywr::iteminsertqueue::ChannelStatusItem;
|
||||
use scywr::iteminsertqueue::QueryItem;
|
||||
use scywr::senderpolling::SenderPolling;
|
||||
use serde::Serialize;
|
||||
use series::ChannelStatusSeriesId;
|
||||
use serieswriter::writer::EstablishWorkerJob;
|
||||
@@ -1047,7 +1047,7 @@ impl CaConnSet {
|
||||
add.backend.clone(),
|
||||
addr_v4,
|
||||
self.local_epics_hostname.clone(),
|
||||
self.iqtx.st_rf3_tx.clone(),
|
||||
self.iqtx.clone2(),
|
||||
self.channel_info_query_tx
|
||||
.clone()
|
||||
.ok_or_else(|| Error::with_msg_no_trace("no more channel_info_query_tx available"))?,
|
||||
|
||||
@@ -7,7 +7,6 @@ pub mod metrics;
|
||||
pub mod netbuf;
|
||||
pub mod polltimer;
|
||||
pub mod rt;
|
||||
pub mod senderpolling;
|
||||
#[cfg(test)]
|
||||
pub mod test;
|
||||
pub mod throttletrace;
|
||||
|
||||
@@ -9,7 +9,7 @@ futures-util = "0.3.28"
|
||||
async-channel = "2.0.0"
|
||||
scylla = "0.11.0"
|
||||
smallvec = "1.11.0"
|
||||
pin-project = "1.1.3"
|
||||
pin-project = "1.1.5"
|
||||
stackfuture = "0.3.0"
|
||||
bytes = "1.5.0"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
use crate::iteminsertqueue::QueryItem;
|
||||
use crate::senderpolling::SenderPolling;
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
use pin_project::pin_project;
|
||||
use std::collections::VecDeque;
|
||||
use std::pin::Pin;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct InsertQueuesTx {
|
||||
@@ -25,6 +28,10 @@ impl InsertQueuesTx {
|
||||
self.lt_rf3_tx.send(item).await.map_err(|_| ())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn clone2(&self) -> Self {
|
||||
self.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -65,3 +72,44 @@ impl InsertDeques {
|
||||
self.lt_rf3_rx.clear();
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project]
|
||||
pub struct InsertSenderPolling {
|
||||
#[pin]
|
||||
pub st_rf1_sp: SenderPolling<VecDeque<QueryItem>>,
|
||||
#[pin]
|
||||
pub st_rf3_sp: SenderPolling<VecDeque<QueryItem>>,
|
||||
#[pin]
|
||||
pub mt_rf3_sp: SenderPolling<VecDeque<QueryItem>>,
|
||||
}
|
||||
|
||||
impl InsertSenderPolling {
|
||||
pub fn new(iqtx: InsertQueuesTx) -> Self {
|
||||
Self {
|
||||
st_rf1_sp: SenderPolling::new(iqtx.st_rf1_tx),
|
||||
st_rf3_sp: SenderPolling::new(iqtx.st_rf3_tx),
|
||||
mt_rf3_sp: SenderPolling::new(iqtx.mt_rf3_tx),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_idle(&self) -> bool {
|
||||
self.st_rf1_sp.is_idle() && self.st_rf3_sp.is_idle() && self.mt_rf3_sp.is_idle()
|
||||
}
|
||||
|
||||
pub fn st_rf1_sp_pin(self: Pin<&mut Self>) -> Pin<&mut SenderPolling<VecDeque<QueryItem>>> {
|
||||
// unsafe {
|
||||
// let this = self.get_unchecked_mut();
|
||||
// let pp1 = &mut this.st_rf1_sp;
|
||||
// Pin::new_unchecked(pp1)
|
||||
// }
|
||||
self.project().st_rf1_sp
|
||||
}
|
||||
|
||||
pub fn st_rf3_sp_pin(self: Pin<&mut Self>) -> Pin<&mut SenderPolling<VecDeque<QueryItem>>> {
|
||||
self.project().st_rf3_sp
|
||||
}
|
||||
|
||||
pub fn mt_rf3_sp_pin(self: Pin<&mut Self>) -> Pin<&mut SenderPolling<VecDeque<QueryItem>>> {
|
||||
self.project().mt_rf3_sp
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ pub mod insertworker;
|
||||
pub mod iteminsertqueue;
|
||||
pub mod ratelimit;
|
||||
pub mod schema;
|
||||
pub mod senderpolling;
|
||||
pub mod session;
|
||||
pub mod store;
|
||||
pub mod tools;
|
||||
|
||||
Reference in New Issue
Block a user