diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index a370f72..4ff1bb2 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.6-aa.0" +version = "0.2.7-aa.0" authors = ["Dominik Werder "] edition = "2021" @@ -24,6 +24,7 @@ err = { path = "../../daqbuf-err", package = "daqbuf-err" } netpod = { path = "../../daqbuf-netpod", package = "daqbuf-netpod" } taskrun = { path = "../../daqbuffer/crates/taskrun" } series = { path = "../../daqbuf-series", package = "daqbuf-series" } +channeltools = { path = "../../daqbuf-channeltools", package = "daqbuf-channeltools" } log = { path = "../log" } stats = { path = "../stats" } scywr = { path = "../scywr" } diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 4403425..b324bd4 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -2,6 +2,7 @@ pub mod inserthook; use async_channel::Receiver; use async_channel::Sender; +use channeltools::channel_combine_ab::ChannelCombineAB; use dbpg::seriesbychannel::ChannelInfoQuery; use err::Error; use log::*; @@ -135,17 +136,20 @@ impl Daemon { let (st_rf1_tx, st_rf1_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap()); let (mt_rf3_tx, mt_rf3_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap()); let (lt_rf3_tx, lt_rf3_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap()); + let (lt_rf3_lat5_tx, lt_rf3_lat5_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap()); let iqtx = InsertQueuesTx { st_rf3_tx, st_rf1_tx, mt_rf3_tx, lt_rf3_tx, + lt_rf3_lat5_tx, }; let iqrx = InsertQueuesRx { st_rf3_rx, st_rf1_rx, mt_rf3_rx, lt_rf3_rx, + lt_rf3_lat5_rx, }; (iqtx, iqrx) }; @@ -251,13 +255,15 @@ impl Daemon { .map_err(Error::from_string)?; insert_worker_jhs.extend(jh); + let lt_rx_combined = ChannelCombineAB::new(iqrx.lt_rf3_rx, iqrx.lt_rf3_lat5_rx); + let jh = scywr::insertworker::spawn_scylla_insert_workers( RetentionTime::Long, opts.scyconf_lt.clone(), ingest_opts.insert_scylla_sessions(), ingest_opts.insert_worker_count().min(2), ingest_opts.insert_worker_concurrency().min(8), - iqrx.lt_rf3_rx, + lt_rx_combined, insert_worker_opts.clone(), insert_worker_stats.clone(), ingest_opts.use_rate_limit_queue(), diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index ced0bc4..cbf0d7a 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -83,7 +83,7 @@ use tokio::net::TcpStream; const CONNECTING_TIMEOUT: Duration = Duration::from_millis(1000 * 6); const CHANNEL_STATUS_EMIT_IVL: Duration = Duration::from_millis(1000 * 8); -const IOC_PING_IVL: Duration = Duration::from_millis(1000 * 120); +const IOC_PING_IVL: Duration = Duration::from_millis(1000 * 180); const MONITOR_POLL_TIMEOUT: Duration = Duration::from_millis(1000 * 6); const TIMEOUT_CHANNEL_CLOSING: Duration = Duration::from_millis(1000 * 8); const TIMEOUT_PONG_WAIT: Duration = Duration::from_millis(1000 * 10); @@ -2084,16 +2084,27 @@ impl CaConn { } else { self.stats.recv_read_notify_state_read_pending.inc(); } - self.read_ioids.remove(&ioid); + let read_expected = if let Some(cid) = self.read_ioids.remove(&ioid) { + true + } else { + false + }; st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow, ts_silence_read_next: tsnow + Self::silence_read_next_ivl_rng(&mut self.rng), }); - { + if read_expected { let item = ChannelStatusItem { ts: self.tmp_ts_poll, cssid: st.channel.cssid.clone(), - status: ChannelStatus::MonitoringSilenceReadUnchanged, + status: ChannelStatus::MonitoringReadResultExpected, + }; + ch_wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?; + } else { + let item = ChannelStatusItem { + ts: self.tmp_ts_poll, + cssid: st.channel.cssid.clone(), + status: ChannelStatus::MonitoringReadResultUnexpected, }; ch_wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?; } @@ -2103,6 +2114,7 @@ impl CaConn { // But there is still a small chance that the monitor will just received slightly later. // More involved check would be to raise a flag, wait for the expected monitor for some // timeout, and if we get nothing error out. + // TODO read-result-after-monitor-silence if false { Self::read_notify_res_for_write( ev, @@ -2204,14 +2216,17 @@ impl CaConn { // TODO should attach these counters already to Writable state. if crst.ts_recv_value_status_emit_next <= tsnow { crst.ts_recv_value_status_emit_next = tsnow + Self::recv_value_status_emit_ivl_rng(rng); - let item = ChannelStatusItem { - ts: stnow, - cssid: crst.cssid, - status: ChannelStatus::MonitoringSilenceReadUnchanged, - }; - let deque = &mut iqdqs.st_rf3_qu; - if wrst.emit_channel_status_item(item, deque).is_err() { - stats.logic_error().inc(); + // TODO was only for debugging + if false { + let item = ChannelStatusItem { + ts: stnow, + cssid: crst.cssid, + status: ChannelStatus::MonitoringSilenceReadUnchanged, + }; + let deque = &mut iqdqs.st_rf3_qu; + if wrst.emit_channel_status_item(item, deque).is_err() { + stats.logic_error().inc(); + } } } let tsev_local = TsNano::from_system_time(stnow); @@ -3302,6 +3317,9 @@ macro_rules! flush_queue { ($self:expr, $qu:ident, $sp:ident, $batcher:expr, $loop_max:expr, $have:expr, $id:expr, $cx:expr, $stats:expr) => { let obj = $self.as_mut().get_mut(); let qu = &mut obj.$qu; + if qu.len() < qu.capacity() * 4 / 10 { + qu.shrink_to(qu.capacity() * 7 / 10); + } let sp = obj.$sp.as_mut(); match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id, $stats) { Ok(Ready(Some(()))) => { @@ -3465,6 +3483,22 @@ impl Stream for CaConn { cx, stats_fn ); + + let stats2 = self.stats.clone(); + let stats_fn = move |item: &VecDeque| { + stats2.iiq_batch_len().ingest(item.len() as u32); + }; + flush_queue_dqs!( + self, + lt_rf3_lat5_qu, + lt_rf3_lat5_sp_pin, + send_batched::<256, _>, + 32, + (&mut have_progress, &mut have_pending), + "lt_rf3_lat5_rx", + cx, + stats_fn + ); } let lts3 = Instant::now(); diff --git a/netfetch/src/ca/conn2.rs b/netfetch/src/ca/conn2.rs index 5a9bd9b..16e2bff 100644 --- a/netfetch/src/ca/conn2.rs +++ b/netfetch/src/ca/conn2.rs @@ -3,3 +3,4 @@ mod channelstateinfo; mod conn; mod conncmd; mod connevent; +mod progpend; diff --git a/netfetch/src/ca/conn2/conn.rs b/netfetch/src/ca/conn2/conn.rs index 9938b77..f75c96e 100644 --- a/netfetch/src/ca/conn2/conn.rs +++ b/netfetch/src/ca/conn2/conn.rs @@ -1,9 +1,13 @@ +mod connecting; + use super::conncmd::ConnCommand; use super::connevent::CaConnEvent; use super::connevent::EndOfStreamReason; use crate::ca::conn::CaConnOpts; +use crate::ca::conn2::progpend::HaveProgressPending; use async_channel::Sender; use ca_proto::ca::proto; +use connecting::Connecting; use dbpg::seriesbychannel::ChannelInfoQuery; use futures_util::Future; use futures_util::FutureExt; @@ -19,6 +23,7 @@ use stats::rand_xoshiro::Xoshiro128PlusPlus; use stats::CaConnStats; use stats::CaProtoStats; use std::collections::VecDeque; +use std::fmt; use std::net::SocketAddrV4; use std::pin::Pin; use std::sync::Arc; @@ -29,6 +34,13 @@ use std::time::Instant; use taskrun::tokio; use tokio::net::TcpStream; +autoerr::create_error_v1!( + name(Error, "Conn"), + enum variants { + TickerPoll, + }, +); + struct DurationMeasureSteps { ts: Instant, durs: smallvec::SmallVec<[Duration; 8]>, @@ -50,22 +62,15 @@ impl DurationMeasureSteps { } } -#[derive(Debug)] -pub enum Error { - TickerPoll, -} - -type ConnectingFut = - Pin, tokio::time::error::Elapsed>> + Send>>; - enum ConnectedState { Init(CaProto), Handshake(CaProto), PeerReady(CaProto), } +#[derive(Debug)] enum CaConnState { - Connecting(Instant, SocketAddrV4, ConnectingFut), + Connecting(Connecting), Connected(CaProto), Shutdown(EndOfStreamReason), Done, @@ -95,11 +100,11 @@ impl CaConn { ) -> Self { let tsnow = Instant::now(); let (cq_tx, cq_rx) = async_channel::bounded::(32); - let mut rng = stats::xoshiro_from_time(); + let rng = stats::xoshiro_from_time(); Self { opts, backend, - state: CaConnState::Connecting(tsnow, remote_addr, err::todoval()), + state: CaConnState::Connecting(Connecting::dummy_new(remote_addr, tsnow)), iqdqs: InsertDeques::new(), ca_conn_event_out_queue: VecDeque::new(), ca_conn_event_out_queue_max: 2000, @@ -120,6 +125,29 @@ impl CaConn { } } +macro_rules! handle_poll_res { + ($res:expr, $hpp:expr) => { + match $res { + Ready(x) => match x { + Ok(x) => match x { + Some(x) => { + $hpp.have_progress(); + } + None => {} + }, + Err(e) => { + // TODO how to handle error: + // Transition state, emit item. + error!("{}", e); + } + }, + Pending => { + $hpp.have_pending(); + } + } + }; +} + impl Stream for CaConn { type Item = CaConnEvent; @@ -129,17 +157,14 @@ impl Stream for CaConn { self.stats.poll_fn_begin().inc(); let ret = loop { self.stats.poll_loop_begin().inc(); - let qlen = self.iqdqs.len(); if qlen >= self.opts.insert_queue_max * 2 / 3 { self.stats.insert_item_queue_pressure().inc(); } else if qlen >= self.opts.insert_queue_max { self.stats.insert_item_queue_full().inc(); } - - let mut have_pending = false; - let mut have_progress = false; - + let mut hppv = HaveProgressPending::new(); + let hpp = &mut hppv; if let CaConnState::Done = self.state { break Ready(None); } else if let Some(item) = self.ca_conn_event_out_queue.pop_front() { @@ -149,10 +174,10 @@ impl Stream for CaConn { // TODO add up duration of this scope match self.as_mut().poll_own_ticker(cx) { Ok(Ready(())) => { - have_progress = true; + hpp.have_progress(); } Ok(Pending) => { - have_pending = true; + hpp.have_pending(); } Err(e) => { self.shutdown_on_error(e); @@ -282,8 +307,8 @@ impl Stream for CaConn { // } // } - break match self.state { - CaConnState::Connecting(_, _, _) => todo!(), + match &mut self.state { + CaConnState::Connecting(st2) => handle_poll_res!(st2.poll_unpin(cx), hpp), CaConnState::Connected(_) => todo!(), CaConnState::Shutdown(_) => { // TODO still attempt to flush queues. @@ -291,7 +316,7 @@ impl Stream for CaConn { todo!() } CaConnState::Done => todo!(), - }; + } // break if self.is_shutdown() { // if self.queues_out_flushed() { @@ -340,6 +365,14 @@ impl Stream for CaConn { // Ready(Some(CaConnEvent::err_now(e))) // } // }; + + break if hpp.is_progress() { + continue; + } else if hpp.is_pending() { + Pending + } else { + Ready(None) + }; }; durs.step(); diff --git a/netfetch/src/ca/conn2/conn/connecting.rs b/netfetch/src/ca/conn2/conn/connecting.rs new file mode 100644 index 0000000..1ebe446 --- /dev/null +++ b/netfetch/src/ca/conn2/conn/connecting.rs @@ -0,0 +1,53 @@ +use std::fmt; +use std::future::Future; +use std::net::SocketAddrV4; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use std::time::Instant; +use taskrun::tokio; +use tokio::net::TcpStream; + +autoerr::create_error_v1!( + name(Error, "Connecting"), + enum variants { + Logic, + }, +); + +type ConnectingFut = + Pin, tokio::time::error::Elapsed>> + Send>>; + +pub struct Connecting { + tsbeg: Instant, + addr: SocketAddrV4, + fut: ConnectingFut, +} + +impl Connecting { + pub fn dummy_new(remote_addr: SocketAddrV4, tsnow: Instant) -> Self { + Self { + tsbeg: tsnow, + addr: remote_addr, + fut: err::todoval(), + } + } + + pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll, Error>> { + let x = Err(Error::Logic); + x? + } + + pub fn poll_unpin(&mut self, cx: &mut Context) -> Poll, Error>> { + Pin::new(self).poll(cx) + } +} + +impl fmt::Debug for Connecting { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Connecting") + .field("tsbeg", &self.tsbeg) + .field("addr", &self.addr) + .finish() + } +} diff --git a/netfetch/src/ca/conn2/progpend.rs b/netfetch/src/ca/conn2/progpend.rs new file mode 100644 index 0000000..20ee846 --- /dev/null +++ b/netfetch/src/ca/conn2/progpend.rs @@ -0,0 +1,29 @@ +pub struct HaveProgressPending { + have_progress: bool, + have_pending: bool, +} + +impl HaveProgressPending { + pub fn new() -> Self { + Self { + have_progress: false, + have_pending: false, + } + } + + pub fn have_progress(&mut self) { + self.have_progress = true; + } + + pub fn have_pending(&mut self) { + self.have_pending = true; + } + + pub fn is_progress(&self) -> bool { + self.have_progress + } + + pub fn is_pending(&self) -> bool { + self.have_pending + } +} diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 1de7146..69a906f 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -107,7 +107,7 @@ impl CaIngestOpts { } pub fn insert_item_queue_cap(&self) -> usize { - self.insert_item_queue_cap.unwrap_or(1000 * 1000) + self.insert_item_queue_cap.unwrap_or(1000 * 1000) * 2 } pub fn store_workers_rate(&self) -> u64 { diff --git a/scywr/src/insertqueues.rs b/scywr/src/insertqueues.rs index 50d89d3..59d0be1 100644 --- a/scywr/src/insertqueues.rs +++ b/scywr/src/insertqueues.rs @@ -24,13 +24,13 @@ pub struct InsertQueuesTx { pub st_rf3_tx: Sender>, pub mt_rf3_tx: Sender>, pub lt_rf3_tx: Sender>, + pub lt_rf3_lat5_tx: Sender>, } impl InsertQueuesTx { /// Send all accumulated batches pub async fn send_all(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { - // Send each buffer down the corresponding channel - if false { + { let item = core::mem::replace(&mut iqdqs.st_rf1_qu, VecDeque::new()); self.st_rf1_tx .send(item) @@ -58,6 +58,13 @@ impl InsertQueuesTx { .await .map_err(|_| Error::ChannelSend(RetentionTime::Long, 3))?; } + { + let item = core::mem::replace(&mut iqdqs.lt_rf3_lat5_qu, VecDeque::new()); + self.lt_rf3_tx + .send(item) + .await + .map_err(|_| Error::ChannelSend(RetentionTime::Long, 3))?; + } Ok(()) } @@ -79,7 +86,7 @@ impl<'a> fmt::Display for InsertQueuesTxSummary<'a> { let obj = self.obj; write!( fmt, - "InsertQueuesTx {{ st_rf1_tx: {} {} {}, st_rf3_tx: {} {} {}, mt_rf3_tx: {} {} {}, lt_rf3_tx: {} {} {} }}", + "InsertQueuesTx {{ st_rf1_tx: {} {} {}, st_rf3_tx: {} {} {}, mt_rf3_tx: {} {} {}, lt_rf3_tx: {} {} {}, lt_rf3_lat5_tx: {} {} {} }}", obj.st_rf1_tx.is_closed(), obj.st_rf1_tx.is_full(), obj.st_rf1_tx.len(), @@ -92,6 +99,9 @@ impl<'a> fmt::Display for InsertQueuesTxSummary<'a> { obj.lt_rf3_tx.is_closed(), obj.lt_rf3_tx.is_full(), obj.lt_rf3_tx.len(), + obj.lt_rf3_lat5_tx.is_closed(), + obj.lt_rf3_lat5_tx.is_full(), + obj.lt_rf3_lat5_tx.len(), ) } } @@ -102,6 +112,7 @@ pub struct InsertQueuesRx { pub st_rf3_rx: Receiver>, pub mt_rf3_rx: Receiver>, pub lt_rf3_rx: Receiver>, + pub lt_rf3_lat5_rx: Receiver>, } pub struct InsertDeques { @@ -109,6 +120,7 @@ pub struct InsertDeques { pub st_rf3_qu: VecDeque, pub mt_rf3_qu: VecDeque, pub lt_rf3_qu: VecDeque, + pub lt_rf3_lat5_qu: VecDeque, } impl InsertDeques { @@ -118,12 +130,17 @@ impl InsertDeques { st_rf3_qu: VecDeque::new(), mt_rf3_qu: VecDeque::new(), lt_rf3_qu: VecDeque::new(), + lt_rf3_lat5_qu: VecDeque::new(), } } /// Total number of items cumulated over all queues. pub fn len(&self) -> usize { - self.st_rf1_qu.len() + self.st_rf3_qu.len() + self.mt_rf3_qu.len() + self.lt_rf3_qu.len() + self.st_rf1_qu.len() + + self.st_rf3_qu.len() + + self.mt_rf3_qu.len() + + self.lt_rf3_qu.len() + + self.lt_rf3_lat5_qu.len() } pub fn clear(&mut self) { @@ -131,6 +148,7 @@ impl InsertDeques { self.st_rf3_qu.clear(); self.mt_rf3_qu.clear(); self.lt_rf3_qu.clear(); + self.lt_rf3_lat5_qu.clear(); } pub fn summary(&self) -> InsertDequesSummary { @@ -166,6 +184,7 @@ impl InsertDeques { &mut self.st_rf3_qu, &mut self.mt_rf3_qu, &mut self.lt_rf3_qu, + &mut self.lt_rf3_lat5_qu, ]; for qu in qus { if qu.len() * 2 < qu.capacity() { @@ -203,6 +222,8 @@ pub struct InsertSenderPolling { pub mt_rf3_sp: SenderPolling>, #[pin] pub lt_rf3_sp: SenderPolling>, + #[pin] + pub lt_rf3_lat5_sp: SenderPolling>, } impl InsertSenderPolling { @@ -212,6 +233,7 @@ impl InsertSenderPolling { st_rf3_sp: SenderPolling::new(iqtx.st_rf3_tx), mt_rf3_sp: SenderPolling::new(iqtx.mt_rf3_tx), lt_rf3_sp: SenderPolling::new(iqtx.lt_rf3_tx), + lt_rf3_lat5_sp: SenderPolling::new(iqtx.lt_rf3_lat5_tx), } } @@ -235,6 +257,10 @@ impl InsertSenderPolling { self.project().lt_rf3_sp } + pub fn lt_rf3_lat5_sp_pin(self: Pin<&mut Self>) -> Pin<&mut SenderPolling>> { + self.project().lt_rf3_lat5_sp + } + pub fn __st_rf1_sp_pin(self: Pin<&mut Self>) -> Pin<&mut SenderPolling>> { if true { panic!("encapsulated by pin_project"); @@ -256,7 +282,7 @@ impl<'a> fmt::Display for InsertSenderPollingSummary<'a> { let obj = self.obj; write!( fmt, - "InsertSenderPolling {{ st_rf1_idle_len: {:?} {:?}, st_rf3_idle_len: {:?} {:?}, mt_rf3_idle_len: {:?} {:?}, lt_rf3_idle_len: {:?} {:?} }}", + "InsertSenderPolling {{ st_rf1_idle_len: {:?} {:?}, st_rf3_idle_len: {:?} {:?}, mt_rf3_idle_len: {:?} {:?}, lt_rf3_idle_len: {:?} {:?}, lt_rf3_lat5_idle_len: {:?} {:?} }}", obj.st_rf1_sp.is_idle(), obj.st_rf1_sp.len(), obj.st_rf3_sp.is_idle(), @@ -265,6 +291,8 @@ impl<'a> fmt::Display for InsertSenderPollingSummary<'a> { obj.mt_rf3_sp.len(), obj.lt_rf3_sp.is_idle(), obj.lt_rf3_sp.len(), + obj.lt_rf3_lat5_sp.is_idle(), + obj.lt_rf3_lat5_sp.len(), ) } } diff --git a/serieswriter/src/binwriter.rs b/serieswriter/src/binwriter.rs index 36f9a32..a3f9630 100644 --- a/serieswriter/src/binwriter.rs +++ b/serieswriter/src/binwriter.rs @@ -26,7 +26,8 @@ macro_rules! trace_ingest { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) macro_rules! trace_tick { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } macro_rules! trace_tick_verbose { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } -macro_rules! debug_bin2 { ($t:expr, $($arg:tt)*) => ( if $t { debug!($($arg)*); } ) } +macro_rules! debug_bin2 { ($t:expr, $($arg:tt)*) => ( if true { if $t { debug!($($arg)*); } } ) } +macro_rules! trace_bin2 { ($t:expr, $($arg:tt)*) => ( if false { if $t { trace!($($arg)*); } } ) } autoerr::create_error_v1!( name(Error, "SerieswriterBinwriter"), @@ -101,7 +102,11 @@ impl BinWriter { if let Some(last) = combs.last_mut() { if last.1 >= DtMs::from_ms_u64(1000 * 60 * 60 * 24) { last.0 = RetentionTime::Long; + } else if last.1 >= DtMs::from_ms_u64(1000 * 60 * 60 * 1) { + last.0 = RetentionTime::Long; + combs.push((RetentionTime::Long, DtMs::from_ms_u64(1000 * 60 * 60 * 24))); } else { + combs.push((RetentionTime::Long, DtMs::from_ms_u64(1000 * 60 * 60 * 1))); combs.push((RetentionTime::Long, DtMs::from_ms_u64(1000 * 60 * 60 * 24))); } }