From e2008a3a5a1e9f5e3b7ee9580d1e73e477e6280a Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 17 Feb 2025 13:38:16 +0100 Subject: [PATCH] Optional reduce write for debug --- daqingest/Cargo.toml | 6 +-- daqingest/src/daemon.rs | 5 ++ netfetch/src/ca/conn2/conn.rs | 28 ++++++++++- netfetch/src/ca/conn2/conn/connected.rs | 62 ++++++++++++++++++++++++ netfetch/src/ca/conn2/conn/connecting.rs | 59 +++++++++++++--------- netfetch/src/conf.rs | 5 ++ scywr/src/insertworker.rs | 45 ++++++++++++++--- 7 files changed, 176 insertions(+), 34 deletions(-) create mode 100644 netfetch/src/ca/conn2/conn/connected.rs diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 4ff1bb2..98651d9 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -10,14 +10,14 @@ default = [] bsread = [] [dependencies] -clap = { version = "4.5.20", features = ["derive", "cargo"] } +clap = { version = "4.5.28", features = ["derive", "cargo"] } tracing = "0.1" serde = { version = "1.0", features = ["derive"] } -tokio-postgres = "0.7.12" +tokio-postgres = "0.7.13" async-channel = "2.3.1" futures-util = "0.3" chrono = "0.4.38" -bytes = "1.8.0" +bytes = "1.10.0" libc = "0.2" autoerr = "0.0.3" err = { path = "../../daqbuf-err", package = "daqbuf-err" } diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index b324bd4..5cae419 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -191,6 +191,8 @@ impl Daemon { // Insert queue hook // let query_item_rx = inserthook::active_channel_insert_hook(query_item_rx); + let ignore_writes = ingest_opts.scylla_ignore_writes(); + let mut insert_worker_jhs = Vec::new(); if ingest_opts.scylla_disable() { @@ -235,6 +237,7 @@ impl Daemon { insert_worker_opts.clone(), insert_worker_stats.clone(), ingest_opts.use_rate_limit_queue(), + ignore_writes, ) .await .map_err(Error::from_string)?; @@ -250,6 +253,7 @@ impl Daemon { insert_worker_opts.clone(), insert_worker_stats.clone(), ingest_opts.use_rate_limit_queue(), + ignore_writes, ) .await .map_err(Error::from_string)?; @@ -267,6 +271,7 @@ impl Daemon { insert_worker_opts.clone(), insert_worker_stats.clone(), ingest_opts.use_rate_limit_queue(), + ignore_writes, ) .await .map_err(Error::from_string)?; diff --git a/netfetch/src/ca/conn2/conn.rs b/netfetch/src/ca/conn2/conn.rs index f75c96e..92a64c1 100644 --- a/netfetch/src/ca/conn2/conn.rs +++ b/netfetch/src/ca/conn2/conn.rs @@ -1,3 +1,4 @@ +mod connected; mod connecting; use super::conncmd::ConnCommand; @@ -7,6 +8,7 @@ use crate::ca::conn::CaConnOpts; use crate::ca::conn2::progpend::HaveProgressPending; use async_channel::Sender; use ca_proto::ca::proto; +use connected::Connected; use connecting::Connecting; use dbpg::seriesbychannel::ChannelInfoQuery; use futures_util::Future; @@ -34,6 +36,10 @@ use std::time::Instant; use taskrun::tokio; use tokio::net::TcpStream; +macro_rules! conn_err { + ($($arg:expr),*) => { if true { info!($($arg),*); } }; +} + autoerr::create_error_v1!( name(Error, "Conn"), enum variants { @@ -104,7 +110,7 @@ impl CaConn { Self { opts, backend, - state: CaConnState::Connecting(Connecting::dummy_new(remote_addr, tsnow)), + state: CaConnState::Connecting(Connecting::new(remote_addr, tsnow)), iqdqs: InsertDeques::new(), ca_conn_event_out_queue: VecDeque::new(), ca_conn_event_out_queue_max: 2000, @@ -308,7 +314,25 @@ impl Stream for CaConn { // } match &mut self.state { - CaConnState::Connecting(st2) => handle_poll_res!(st2.poll_unpin(cx), hpp), + CaConnState::Connecting(st2) => match st2.poll_unpin(cx) { + Ready(x) => match x { + Ok(Some(x)) => { + hpp.have_progress(); + self.state = CaConnState::Connected(Connected::new(x)); + } + Ok(None) => { + // TODO + // In this case, should probably be treated like error. + } + Err(e) => { + // TODO handle or propagate the error, change state. + conn_err!("{}", e); + } + }, + Pending => { + hpp.have_pending(); + } + }, CaConnState::Connected(_) => todo!(), CaConnState::Shutdown(_) => { // TODO still attempt to flush queues. diff --git a/netfetch/src/ca/conn2/conn/connected.rs b/netfetch/src/ca/conn2/conn/connected.rs new file mode 100644 index 0000000..846fe8d --- /dev/null +++ b/netfetch/src/ca/conn2/conn/connected.rs @@ -0,0 +1,62 @@ +use futures_util::FutureExt; +use std::fmt; +use std::future::Future; +use std::net::SocketAddrV4; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use std::time::Duration; +use std::time::Instant; +use taskrun::tokio; +use tokio::net::TcpStream; +use tokio::time::error::Elapsed; + +autoerr::create_error_v1!( + name(Error, "Connected"), + enum variants { + Timeout, + IO(#[from] std::io::Error), + }, +); + +type PollType = (); + +type ReturnType = Result, Elapsed>; + +// type ConnectingFut = Pin + Send>>; + +pub struct Connected { + tsbeg: Instant, + addr: SocketAddrV4, + tcp: TcpStream, + // fut: ConnectingFut, +} + +impl fmt::Debug for Connected { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Connected") + .field("tsbeg", &self.tsbeg) + .field("addr", &self.addr) + .finish() + } +} + +impl Connected { + pub fn new(tcp: TcpStream) -> Self { + Self { + tsbeg: tsnow, + addr: remote_addr, + tcp, + // fut: Box::pin(fut), + } + } + + pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll, Error>> { + use Poll::*; + Pending + } + + pub fn poll_unpin(&mut self, cx: &mut Context) -> Poll, Error>> { + Pin::new(self).poll(cx) + } +} diff --git a/netfetch/src/ca/conn2/conn/connecting.rs b/netfetch/src/ca/conn2/conn/connecting.rs index 1ebe446..81fa51c 100644 --- a/netfetch/src/ca/conn2/conn/connecting.rs +++ b/netfetch/src/ca/conn2/conn/connecting.rs @@ -1,22 +1,29 @@ +use futures_util::FutureExt; use std::fmt; use std::future::Future; use std::net::SocketAddrV4; use std::pin::Pin; use std::task::Context; use std::task::Poll; +use std::time::Duration; use std::time::Instant; use taskrun::tokio; use tokio::net::TcpStream; +use tokio::time::error::Elapsed; autoerr::create_error_v1!( name(Error, "Connecting"), enum variants { - Logic, + Timeout, + IO(#[from] std::io::Error), }, ); -type ConnectingFut = - Pin, tokio::time::error::Elapsed>> + Send>>; +type PollType = TcpStream; + +type ReturnType = Result, Elapsed>; + +type ConnectingFut = Pin + Send>>; pub struct Connecting { tsbeg: Instant, @@ -24,25 +31,6 @@ pub struct Connecting { fut: ConnectingFut, } -impl Connecting { - pub fn dummy_new(remote_addr: SocketAddrV4, tsnow: Instant) -> Self { - Self { - tsbeg: tsnow, - addr: remote_addr, - fut: err::todoval(), - } - } - - pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll, Error>> { - let x = Err(Error::Logic); - x? - } - - pub fn poll_unpin(&mut self, cx: &mut Context) -> Poll, Error>> { - Pin::new(self).poll(cx) - } -} - impl fmt::Debug for Connecting { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("Connecting") @@ -51,3 +39,30 @@ impl fmt::Debug for Connecting { .finish() } } + +impl Connecting { + pub fn new(remote_addr: SocketAddrV4, tsnow: Instant) -> Self { + let fut = tokio::time::timeout(Duration::from_millis(1800), tokio::net::TcpStream::connect(remote_addr)); + Self { + tsbeg: tsnow, + addr: remote_addr, + fut: Box::pin(fut), + } + } + + pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll, Error>> { + use Poll::*; + match self.fut.poll_unpin(cx) { + Ready(x) => match x { + Ok(Ok(x)) => Ready(Ok(Some(x))), + Ok(Err(e)) => Ready(Err(e.into())), + Err(_) => Ready(Err(Error::Timeout)), + }, + Pending => Pending, + } + } + + pub fn poll_unpin(&mut self, cx: &mut Context) -> Poll, Error>> { + Pin::new(self).poll(cx) + } +} diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 69a906f..869c4a8 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -43,6 +43,7 @@ pub struct CaIngestOpts { pub test_bsread_addr: Option, #[serde(default)] scylla_disable: bool, + scylla_ignore_writes: bool, } impl CaIngestOpts { @@ -125,6 +126,10 @@ impl CaIngestOpts { pub fn scylla_disable(&self) -> bool { self.scylla_disable } + + pub fn scylla_ignore_writes(&self) -> bool { + self.scylla_ignore_writes + } } #[test] diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 428b991..781f9b1 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -122,6 +122,7 @@ pub async fn spawn_scylla_insert_workers( insert_worker_opts: Arc, store_stats: Arc, use_rate_limit_queue: bool, + ignore_writes: bool, ) -> Result>>, Error> { let item_inp = if use_rate_limit_queue { crate::ratelimit::rate_limiter(insert_worker_opts.store_workers_rate.clone(), item_inp) @@ -142,6 +143,7 @@ pub async fn spawn_scylla_insert_workers( item_inp.clone(), insert_worker_opts.clone(), Some(data_store), + ignore_writes, store_stats.clone(), )); jhs.push(jh); @@ -165,6 +167,7 @@ pub async fn spawn_scylla_insert_workers_dummy( item_inp.clone(), insert_worker_opts.clone(), data_store, + true, store_stats.clone(), )); jhs.push(jh); @@ -178,6 +181,7 @@ async fn worker_streamed( item_inp: Receiver>, insert_worker_opts: Arc, data_store: Option>, + ignore_writes: bool, stats: Arc, ) -> Result<(), Error> { debug_setup!("worker_streamed begin"); @@ -191,7 +195,7 @@ async fn worker_streamed( .map_or_else(|| format!("dummy"), |x| x.rett.debug_tag().to_string()); let stream = inspect_items(stream, worker_name.clone()); if let Some(data_store) = data_store { - let stream = transform_to_db_futures(stream, data_store, stats.clone()); + let stream = transform_to_db_futures(stream, data_store, ignore_writes, stats.clone()); let stream = stream .map(|x| futures_util::stream::iter(x)) .flatten_unordered(Some(1)) @@ -237,6 +241,7 @@ async fn worker_streamed( fn transform_to_db_futures( item_inp: S, data_store: Arc, + ignore_writes: bool, stats: Arc, ) -> impl Stream> where @@ -252,14 +257,40 @@ where let mut res = Vec::with_capacity(32); for item in batch { let futs = match item { - QueryItem::Insert(item) => prepare_query_insert_futs(item, &data_store, &stats, tsnow), - QueryItem::Msp(item) => prepare_msp_insert_futs(item, &data_store, &stats, tsnow), - QueryItem::TimeBinSimpleF32V02(item) => { - prepare_timebin_v02_insert_futs(item, &data_store, &stats, tsnow) + QueryItem::Insert(item) => { + if ignore_writes { + SmallVec::new() + } else { + prepare_query_insert_futs(item, &data_store, &stats, tsnow) + } + } + QueryItem::Msp(item) => { + if ignore_writes { + SmallVec::new() + } else { + prepare_msp_insert_futs(item, &data_store, &stats, tsnow) + } + } + QueryItem::TimeBinSimpleF32V02(item) => { + if ignore_writes { + SmallVec::new() + } else { + prepare_timebin_v02_insert_futs(item, &data_store, &stats, tsnow) + } + } + QueryItem::Accounting(item) => { + if ignore_writes { + SmallVec::new() + } else { + prepare_accounting_insert_futs(item, &data_store, &stats, tsnow) + } } - QueryItem::Accounting(item) => prepare_accounting_insert_futs(item, &data_store, &stats, tsnow), QueryItem::AccountingRecv(item) => { - prepare_accounting_recv_insert_futs(item, &data_store, &stats, tsnow) + if ignore_writes { + SmallVec::new() + } else { + prepare_accounting_recv_insert_futs(item, &data_store, &stats, tsnow) + } } }; trace!("prepared futs len {}", futs.len());