diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 85ed6b1..755d1bf 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -4,7 +4,6 @@ use super::proto::ReadNotify; use crate::ca::proto::ChannelClose; use crate::ca::proto::EventCancel; use crate::conf::ChannelConfig; -use crate::senderpolling::SenderPolling; use crate::throttletrace::ThrottleTrace; use async_channel::Receiver; use async_channel::Sender; @@ -31,11 +30,14 @@ use proto::CaProto; use proto::CreateChan; use proto::EventAdd; use scywr::insertqueues::InsertDeques; +use scywr::insertqueues::InsertQueuesTx; +use scywr::insertqueues::InsertSenderPolling; use scywr::iteminsertqueue as scywriiq; use scywr::iteminsertqueue::Accounting; use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::ShutdownReason; +use scywr::senderpolling::SenderPolling; use scywriiq::ChannelStatus; use scywriiq::ChannelStatusClosedReason; use scywriiq::ChannelStatusItem; @@ -770,7 +772,7 @@ pub struct CaConn { ioc_ping_last: Instant, ioc_ping_next: Instant, ioc_ping_start: Option, - storage_insert_sender: Pin>>>, + iqsp: Pin>, ca_conn_event_out_queue: VecDeque, ca_conn_event_out_queue_max: usize, thr_msg_poll: ThrottleTrace, @@ -799,7 +801,7 @@ impl CaConn { backend: String, remote_addr_dbg: SocketAddrV4, local_epics_hostname: String, - storage_insert_tx: Sender>, + iqtx: InsertQueuesTx, channel_info_query_tx: Sender, stats: Arc, ca_proto_stats: Arc, @@ -836,7 +838,7 @@ impl CaConn { ioc_ping_last: tsnow, ioc_ping_next: tsnow + Self::ioc_ping_ivl_rng(&mut rng), ioc_ping_start: None, - storage_insert_sender: Box::pin(SenderPolling::new(storage_insert_tx)), + iqsp: Box::pin(InsertSenderPolling::new(iqtx)), ca_conn_event_out_queue: VecDeque::new(), ca_conn_event_out_queue_max: 2000, thr_msg_poll: ThrottleTrace::new(Duration::from_millis(10000)), @@ -2398,15 +2400,16 @@ impl CaConn { debug!( "async out flushed iiq {} {} caout {}", self.iqdqs.len() == 0, - self.storage_insert_sender.is_idle(), + self.iqsp.is_idle(), self.ca_conn_event_out_queue.is_empty() ); - self.iqdqs.len() == 0 && self.storage_insert_sender.is_idle() && self.ca_conn_event_out_queue.is_empty() + self.iqdqs.len() == 0 && self.iqsp.is_idle() && self.ca_conn_event_out_queue.is_empty() } fn attempt_flush_queue( qu: &mut VecDeque, - sp: &mut Pin>>, + // sp: &mut Pin>>, + mut sp: Pin<&mut SenderPolling>, qu_to_si: FB, loop_max: u32, cx: &mut Context, @@ -2448,7 +2451,7 @@ impl CaConn { have_progress = true; } Ready(Err(e)) => { - use crate::senderpolling::Error as SpErr; + use scywr::senderpolling::Error as SpErr; match e { SpErr::NoSendInProgress => return Err(Error::NotSending), SpErr::Closed(_) => return Err(Error::ClosedSending), @@ -2475,7 +2478,7 @@ macro_rules! flush_queue { ($self:expr, $qu:ident, $sp:ident, $batcher:expr, $loop_max:expr, $have:expr, $id:expr, $cx:expr, $stats:expr) => { let obj = $self.as_mut().get_mut(); let qu = &mut obj.$qu; - let sp = &mut obj.$sp; + let sp = obj.$sp.as_mut(); match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id, $stats) { Ok(Ready(Some(()))) => { *$have.0 |= true; @@ -2493,7 +2496,11 @@ macro_rules! flush_queue_dqs { ($self:expr, $qu:ident, $sp:ident, $batcher:expr, $loop_max:expr, $have:expr, $id:expr, $cx:expr, $stats:expr) => { let obj = $self.as_mut().get_mut(); let qu = &mut obj.iqdqs.$qu; - let sp = &mut obj.$sp; + // let sp = std::pin::pin!(obj.iqsp.$sp); + // let sp = &mut obj.iqsp.$sp; + // let sp = std::pin::pin!(sp); + // let sp = todo!(); + let sp = obj.iqsp.as_mut().$sp(); match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id, $stats) { Ok(Ready(Some(()))) => { *$have.0 |= true; @@ -2577,11 +2584,41 @@ impl Stream for CaConn { flush_queue_dqs!( self, st_rf1_rx, - storage_insert_sender, + st_rf1_sp_pin, send_batched::<256, _>, 32, (&mut have_progress, &mut have_pending), - "iq_st_rf1", + "st_rf1_rx", + cx, + stats_fn + ); + let stats2 = self.stats.clone(); + let stats_fn = move |item: &VecDeque| { + stats2.iiq_batch_len().ingest(item.len() as u32); + }; + flush_queue_dqs!( + self, + st_rf3_rx, + st_rf3_sp_pin, + send_batched::<256, _>, + 32, + (&mut have_progress, &mut have_pending), + "st_rf3_rx", + cx, + stats_fn + ); + let stats2 = self.stats.clone(); + let stats_fn = move |item: &VecDeque| { + stats2.iiq_batch_len().ingest(item.len() as u32); + }; + flush_queue_dqs!( + self, + mt_rf3_rx, + mt_rf3_sp_pin, + send_batched::<256, _>, + 32, + (&mut have_progress, &mut have_pending), + "mt_rf3_rx", cx, stats_fn ); diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index c470587..67054ac 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -10,7 +10,6 @@ use crate::conf::ChannelConfig; use crate::daemon_common::Channel; use crate::errconv::ErrConv; use crate::rt::JoinHandle; -use crate::senderpolling::SenderPolling; use crate::throttletrace::ThrottleTrace; use async_channel::Receiver; use async_channel::Sender; @@ -38,6 +37,7 @@ use netpod::SeriesKind; use netpod::Shape; use scywr::iteminsertqueue::ChannelStatusItem; use scywr::iteminsertqueue::QueryItem; +use scywr::senderpolling::SenderPolling; use serde::Serialize; use series::ChannelStatusSeriesId; use serieswriter::writer::EstablishWorkerJob; @@ -1047,7 +1047,7 @@ impl CaConnSet { add.backend.clone(), addr_v4, self.local_epics_hostname.clone(), - self.iqtx.st_rf3_tx.clone(), + self.iqtx.clone2(), self.channel_info_query_tx .clone() .ok_or_else(|| Error::with_msg_no_trace("no more channel_info_query_tx available"))?, diff --git a/netfetch/src/lib.rs b/netfetch/src/lib.rs index e948d4d..01a6d9f 100644 --- a/netfetch/src/lib.rs +++ b/netfetch/src/lib.rs @@ -7,7 +7,6 @@ pub mod metrics; pub mod netbuf; pub mod polltimer; pub mod rt; -pub mod senderpolling; #[cfg(test)] pub mod test; pub mod throttletrace; diff --git a/scywr/Cargo.toml b/scywr/Cargo.toml index 748eed1..b4a551b 100644 --- a/scywr/Cargo.toml +++ b/scywr/Cargo.toml @@ -9,7 +9,7 @@ futures-util = "0.3.28" async-channel = "2.0.0" scylla = "0.11.0" smallvec = "1.11.0" -pin-project = "1.1.3" +pin-project = "1.1.5" stackfuture = "0.3.0" bytes = "1.5.0" serde = { version = "1", features = ["derive"] } diff --git a/scywr/src/insertqueues.rs b/scywr/src/insertqueues.rs index 07c336e..0aab5de 100644 --- a/scywr/src/insertqueues.rs +++ b/scywr/src/insertqueues.rs @@ -1,7 +1,10 @@ use crate::iteminsertqueue::QueryItem; +use crate::senderpolling::SenderPolling; use async_channel::Receiver; use async_channel::Sender; +use pin_project::pin_project; use std::collections::VecDeque; +use std::pin::Pin; #[derive(Clone)] pub struct InsertQueuesTx { @@ -25,6 +28,10 @@ impl InsertQueuesTx { self.lt_rf3_tx.send(item).await.map_err(|_| ())?; Ok(()) } + + pub fn clone2(&self) -> Self { + self.clone() + } } #[derive(Clone)] @@ -65,3 +72,44 @@ impl InsertDeques { self.lt_rf3_rx.clear(); } } + +#[pin_project] +pub struct InsertSenderPolling { + #[pin] + pub st_rf1_sp: SenderPolling>, + #[pin] + pub st_rf3_sp: SenderPolling>, + #[pin] + pub mt_rf3_sp: SenderPolling>, +} + +impl InsertSenderPolling { + pub fn new(iqtx: InsertQueuesTx) -> Self { + Self { + st_rf1_sp: SenderPolling::new(iqtx.st_rf1_tx), + st_rf3_sp: SenderPolling::new(iqtx.st_rf3_tx), + mt_rf3_sp: SenderPolling::new(iqtx.mt_rf3_tx), + } + } + + pub fn is_idle(&self) -> bool { + self.st_rf1_sp.is_idle() && self.st_rf3_sp.is_idle() && self.mt_rf3_sp.is_idle() + } + + pub fn st_rf1_sp_pin(self: Pin<&mut Self>) -> Pin<&mut SenderPolling>> { + // unsafe { + // let this = self.get_unchecked_mut(); + // let pp1 = &mut this.st_rf1_sp; + // Pin::new_unchecked(pp1) + // } + self.project().st_rf1_sp + } + + pub fn st_rf3_sp_pin(self: Pin<&mut Self>) -> Pin<&mut SenderPolling>> { + self.project().st_rf3_sp + } + + pub fn mt_rf3_sp_pin(self: Pin<&mut Self>) -> Pin<&mut SenderPolling>> { + self.project().mt_rf3_sp + } +} diff --git a/scywr/src/lib.rs b/scywr/src/lib.rs index 25bc7d1..9071e95 100644 --- a/scywr/src/lib.rs +++ b/scywr/src/lib.rs @@ -11,6 +11,7 @@ pub mod insertworker; pub mod iteminsertqueue; pub mod ratelimit; pub mod schema; +pub mod senderpolling; pub mod session; pub mod store; pub mod tools; diff --git a/netfetch/src/senderpolling.rs b/scywr/src/senderpolling.rs similarity index 100% rename from netfetch/src/senderpolling.rs rename to scywr/src/senderpolling.rs