Optional reduce write for debug

This commit is contained in:
Dominik Werder
2025-02-17 13:38:16 +01:00
parent 67a0d1f3ea
commit e2008a3a5a
7 changed files with 176 additions and 34 deletions

View File

@@ -10,14 +10,14 @@ default = []
bsread = []
[dependencies]
clap = { version = "4.5.20", features = ["derive", "cargo"] }
clap = { version = "4.5.28", features = ["derive", "cargo"] }
tracing = "0.1"
serde = { version = "1.0", features = ["derive"] }
tokio-postgres = "0.7.12"
tokio-postgres = "0.7.13"
async-channel = "2.3.1"
futures-util = "0.3"
chrono = "0.4.38"
bytes = "1.8.0"
bytes = "1.10.0"
libc = "0.2"
autoerr = "0.0.3"
err = { path = "../../daqbuf-err", package = "daqbuf-err" }

View File

@@ -191,6 +191,8 @@ impl Daemon {
// Insert queue hook
// let query_item_rx = inserthook::active_channel_insert_hook(query_item_rx);
let ignore_writes = ingest_opts.scylla_ignore_writes();
let mut insert_worker_jhs = Vec::new();
if ingest_opts.scylla_disable() {
@@ -235,6 +237,7 @@ impl Daemon {
insert_worker_opts.clone(),
insert_worker_stats.clone(),
ingest_opts.use_rate_limit_queue(),
ignore_writes,
)
.await
.map_err(Error::from_string)?;
@@ -250,6 +253,7 @@ impl Daemon {
insert_worker_opts.clone(),
insert_worker_stats.clone(),
ingest_opts.use_rate_limit_queue(),
ignore_writes,
)
.await
.map_err(Error::from_string)?;
@@ -267,6 +271,7 @@ impl Daemon {
insert_worker_opts.clone(),
insert_worker_stats.clone(),
ingest_opts.use_rate_limit_queue(),
ignore_writes,
)
.await
.map_err(Error::from_string)?;

View File

@@ -1,3 +1,4 @@
mod connected;
mod connecting;
use super::conncmd::ConnCommand;
@@ -7,6 +8,7 @@ use crate::ca::conn::CaConnOpts;
use crate::ca::conn2::progpend::HaveProgressPending;
use async_channel::Sender;
use ca_proto::ca::proto;
use connected::Connected;
use connecting::Connecting;
use dbpg::seriesbychannel::ChannelInfoQuery;
use futures_util::Future;
@@ -34,6 +36,10 @@ use std::time::Instant;
use taskrun::tokio;
use tokio::net::TcpStream;
macro_rules! conn_err {
($($arg:expr),*) => { if true { info!($($arg),*); } };
}
autoerr::create_error_v1!(
name(Error, "Conn"),
enum variants {
@@ -104,7 +110,7 @@ impl CaConn {
Self {
opts,
backend,
state: CaConnState::Connecting(Connecting::dummy_new(remote_addr, tsnow)),
state: CaConnState::Connecting(Connecting::new(remote_addr, tsnow)),
iqdqs: InsertDeques::new(),
ca_conn_event_out_queue: VecDeque::new(),
ca_conn_event_out_queue_max: 2000,
@@ -308,7 +314,25 @@ impl Stream for CaConn {
// }
match &mut self.state {
CaConnState::Connecting(st2) => handle_poll_res!(st2.poll_unpin(cx), hpp),
CaConnState::Connecting(st2) => match st2.poll_unpin(cx) {
Ready(x) => match x {
Ok(Some(x)) => {
hpp.have_progress();
self.state = CaConnState::Connected(Connected::new(x));
}
Ok(None) => {
// TODO
// In this case, should probably be treated like error.
}
Err(e) => {
// TODO handle or propagate the error, change state.
conn_err!("{}", e);
}
},
Pending => {
hpp.have_pending();
}
},
CaConnState::Connected(_) => todo!(),
CaConnState::Shutdown(_) => {
// TODO still attempt to flush queues.

View File

@@ -0,0 +1,62 @@
use futures_util::FutureExt;
use std::fmt;
use std::future::Future;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
use tokio::net::TcpStream;
use tokio::time::error::Elapsed;
autoerr::create_error_v1!(
name(Error, "Connected"),
enum variants {
Timeout,
IO(#[from] std::io::Error),
},
);
type PollType = ();
type ReturnType = Result<Result<TcpStream, std::io::Error>, Elapsed>;
// type ConnectingFut = Pin<Box<dyn Future<Output = ReturnType> + Send>>;
pub struct Connected {
tsbeg: Instant,
addr: SocketAddrV4,
tcp: TcpStream,
// fut: ConnectingFut,
}
impl fmt::Debug for Connected {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Connected")
.field("tsbeg", &self.tsbeg)
.field("addr", &self.addr)
.finish()
}
}
impl Connected {
pub fn new(tcp: TcpStream) -> Self {
Self {
tsbeg: tsnow,
addr: remote_addr,
tcp,
// fut: Box::pin(fut),
}
}
pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<Option<PollType>, Error>> {
use Poll::*;
Pending
}
pub fn poll_unpin(&mut self, cx: &mut Context) -> Poll<Result<Option<PollType>, Error>> {
Pin::new(self).poll(cx)
}
}

View File

@@ -1,22 +1,29 @@
use futures_util::FutureExt;
use std::fmt;
use std::future::Future;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
use tokio::net::TcpStream;
use tokio::time::error::Elapsed;
autoerr::create_error_v1!(
name(Error, "Connecting"),
enum variants {
Logic,
Timeout,
IO(#[from] std::io::Error),
},
);
type ConnectingFut =
Pin<Box<dyn Future<Output = Result<Result<TcpStream, std::io::Error>, tokio::time::error::Elapsed>> + Send>>;
type PollType = TcpStream;
type ReturnType = Result<Result<TcpStream, std::io::Error>, Elapsed>;
type ConnectingFut = Pin<Box<dyn Future<Output = ReturnType> + Send>>;
pub struct Connecting {
tsbeg: Instant,
@@ -24,25 +31,6 @@ pub struct Connecting {
fut: ConnectingFut,
}
impl Connecting {
pub fn dummy_new(remote_addr: SocketAddrV4, tsnow: Instant) -> Self {
Self {
tsbeg: tsnow,
addr: remote_addr,
fut: err::todoval(),
}
}
pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<Option<()>, Error>> {
let x = Err(Error::Logic);
x?
}
pub fn poll_unpin(&mut self, cx: &mut Context) -> Poll<Result<Option<()>, Error>> {
Pin::new(self).poll(cx)
}
}
impl fmt::Debug for Connecting {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Connecting")
@@ -51,3 +39,30 @@ impl fmt::Debug for Connecting {
.finish()
}
}
impl Connecting {
pub fn new(remote_addr: SocketAddrV4, tsnow: Instant) -> Self {
let fut = tokio::time::timeout(Duration::from_millis(1800), tokio::net::TcpStream::connect(remote_addr));
Self {
tsbeg: tsnow,
addr: remote_addr,
fut: Box::pin(fut),
}
}
pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<Option<PollType>, Error>> {
use Poll::*;
match self.fut.poll_unpin(cx) {
Ready(x) => match x {
Ok(Ok(x)) => Ready(Ok(Some(x))),
Ok(Err(e)) => Ready(Err(e.into())),
Err(_) => Ready(Err(Error::Timeout)),
},
Pending => Pending,
}
}
pub fn poll_unpin(&mut self, cx: &mut Context) -> Poll<Result<Option<PollType>, Error>> {
Pin::new(self).poll(cx)
}
}

View File

@@ -43,6 +43,7 @@ pub struct CaIngestOpts {
pub test_bsread_addr: Option<String>,
#[serde(default)]
scylla_disable: bool,
scylla_ignore_writes: bool,
}
impl CaIngestOpts {
@@ -125,6 +126,10 @@ impl CaIngestOpts {
pub fn scylla_disable(&self) -> bool {
self.scylla_disable
}
pub fn scylla_ignore_writes(&self) -> bool {
self.scylla_ignore_writes
}
}
#[test]

View File

@@ -122,6 +122,7 @@ pub async fn spawn_scylla_insert_workers(
insert_worker_opts: Arc<InsertWorkerOpts>,
store_stats: Arc<stats::InsertWorkerStats>,
use_rate_limit_queue: bool,
ignore_writes: bool,
) -> Result<Vec<JoinHandle<Result<(), Error>>>, Error> {
let item_inp = if use_rate_limit_queue {
crate::ratelimit::rate_limiter(insert_worker_opts.store_workers_rate.clone(), item_inp)
@@ -142,6 +143,7 @@ pub async fn spawn_scylla_insert_workers(
item_inp.clone(),
insert_worker_opts.clone(),
Some(data_store),
ignore_writes,
store_stats.clone(),
));
jhs.push(jh);
@@ -165,6 +167,7 @@ pub async fn spawn_scylla_insert_workers_dummy(
item_inp.clone(),
insert_worker_opts.clone(),
data_store,
true,
store_stats.clone(),
));
jhs.push(jh);
@@ -178,6 +181,7 @@ async fn worker_streamed(
item_inp: Receiver<VecDeque<QueryItem>>,
insert_worker_opts: Arc<InsertWorkerOpts>,
data_store: Option<Arc<DataStore>>,
ignore_writes: bool,
stats: Arc<InsertWorkerStats>,
) -> Result<(), Error> {
debug_setup!("worker_streamed begin");
@@ -191,7 +195,7 @@ async fn worker_streamed(
.map_or_else(|| format!("dummy"), |x| x.rett.debug_tag().to_string());
let stream = inspect_items(stream, worker_name.clone());
if let Some(data_store) = data_store {
let stream = transform_to_db_futures(stream, data_store, stats.clone());
let stream = transform_to_db_futures(stream, data_store, ignore_writes, stats.clone());
let stream = stream
.map(|x| futures_util::stream::iter(x))
.flatten_unordered(Some(1))
@@ -237,6 +241,7 @@ async fn worker_streamed(
fn transform_to_db_futures<S>(
item_inp: S,
data_store: Arc<DataStore>,
ignore_writes: bool,
stats: Arc<InsertWorkerStats>,
) -> impl Stream<Item = Vec<InsertFut>>
where
@@ -252,14 +257,40 @@ where
let mut res = Vec::with_capacity(32);
for item in batch {
let futs = match item {
QueryItem::Insert(item) => prepare_query_insert_futs(item, &data_store, &stats, tsnow),
QueryItem::Msp(item) => prepare_msp_insert_futs(item, &data_store, &stats, tsnow),
QueryItem::TimeBinSimpleF32V02(item) => {
prepare_timebin_v02_insert_futs(item, &data_store, &stats, tsnow)
QueryItem::Insert(item) => {
if ignore_writes {
SmallVec::new()
} else {
prepare_query_insert_futs(item, &data_store, &stats, tsnow)
}
}
QueryItem::Msp(item) => {
if ignore_writes {
SmallVec::new()
} else {
prepare_msp_insert_futs(item, &data_store, &stats, tsnow)
}
}
QueryItem::TimeBinSimpleF32V02(item) => {
if ignore_writes {
SmallVec::new()
} else {
prepare_timebin_v02_insert_futs(item, &data_store, &stats, tsnow)
}
}
QueryItem::Accounting(item) => {
if ignore_writes {
SmallVec::new()
} else {
prepare_accounting_insert_futs(item, &data_store, &stats, tsnow)
}
}
QueryItem::Accounting(item) => prepare_accounting_insert_futs(item, &data_store, &stats, tsnow),
QueryItem::AccountingRecv(item) => {
prepare_accounting_recv_insert_futs(item, &data_store, &stats, tsnow)
if ignore_writes {
SmallVec::new()
} else {
prepare_accounting_recv_insert_futs(item, &data_store, &stats, tsnow)
}
}
};
trace!("prepared futs len {}", futs.len());