Establish series writers in parallel

This commit is contained in:
Dominik Werder
2024-01-13 10:23:04 +01:00
parent bbc2855767
commit 2b185b8277
4 changed files with 60 additions and 80 deletions

View File

@@ -369,9 +369,9 @@ pub async fn start_lookup_workers(
),
Error,
> {
let inp_cap = 128;
let inp_cap = 64;
let batch_out_cap = 4;
let timeout = Duration::from_millis(400);
let timeout = Duration::from_millis(100);
let (query_tx, query_rx) = async_channel::bounded(inp_cap);
let (batch_rx, bjh) = batchtools::batcher::batch(inp_cap, timeout, batch_out_cap, query_rx);
let mut jhs = Vec::new();

View File

@@ -184,21 +184,21 @@ enum ChannelError {
CreateChanFail(ChannelStatusSeriesId),
}
#[derive(Debug)]
#[derive(Debug, Clone)]
struct EventedState {
ts_last: Instant,
recv_count: u64,
recv_bytes: u64,
}
#[derive(Debug)]
#[derive(Debug, Clone)]
enum MonitoringState {
FetchSeriesId,
AddingEvent(SeriesId),
Evented(SeriesId, EventedState),
}
#[derive(Debug)]
#[derive(Debug, Clone)]
struct CreatedState {
cssid: ChannelStatusSeriesId,
cid: Cid,
@@ -218,28 +218,6 @@ struct CreatedState {
info_store_msp_last: u32,
}
impl Default for CreatedState {
fn default() -> Self {
Self {
cssid: ChannelStatusSeriesId::new(123123),
cid: Cid(123123),
sid: Sid(123123),
ts_created: Instant::now(),
ts_alive_last: Instant::now(),
state: MonitoringState::FetchSeriesId,
ts_msp_last: 4242,
ts_msp_grid_last: 4242,
inserted_in_ts_msp: 4242,
insert_item_ivl_ema: IntervalEma::new(),
item_recv_ivl_ema: IntervalEma::new(),
insert_recv_ivl_last: Instant::now(),
insert_next_earliest: Instant::now(),
muted_before: Default::default(),
info_store_msp_last: Default::default(),
}
}
}
#[derive(Debug)]
struct WritableState {
created: CreatedState,
@@ -951,10 +929,11 @@ impl CaConn {
let proto = self.proto.as_mut().unwrap();
proto.push_out(msg);
}
st2.state = MonitoringState::AddingEvent(wr.sid());
let created = std::mem::replace(st2, Default::default());
*chst = ChannelState::Writable(WritableState { created, writer: wr });
*chst = ChannelState::Writable(WritableState {
created: st2.clone(),
writer: wr,
});
Ok(())
} else {
warn!("TODO handle_series_lookup_result channel in bad state, reset");
@@ -2019,6 +1998,7 @@ impl CaConn {
qu_to_si: FB,
loop_max: u32,
cx: &mut Context,
id: &str,
) -> Result<Poll<Option<()>>, Error>
where
Q: Unpin,
@@ -2033,30 +2013,22 @@ impl CaConn {
break;
}
if !sp.has_sender() {
return Err(Error::with_msg_no_trace("attempt_flush_queue no sender"));
return Err(Error::with_msg_no_trace(format!("flush queue {id} no sender")));
}
if sp.is_idle() {
if let Some(item) = qu_to_si(qu) {
sp.as_mut().send_pin(item);
} else {
break;
}
// TODO maybe use a generic function which produces the next
// item from a queue: can be a batch!
// if let Some(item) = qu.pop_front() {
// // let sd = self.writer_establish_tx.as_mut();
// // sp.as_mut().send_pin(item);
// } else {
// // break;
// }
}
// let sd = &mut self.writer_establish_tx;
if sp.is_sending() {
match sp.poll_unpin(cx) {
Ready(Ok(())) => {
have_progress = true;
}
Ready(Err(e)) => {
let e = Error::with_msg_no_trace(format!("attempt_flush_queue {e}"));
let e = Error::with_msg_no_trace(format!("flush queue {id} {e}"));
return Err(e);
}
Pending => {
@@ -2064,7 +2036,7 @@ impl CaConn {
}
}
} else {
let e = Error::with_msg_no_trace(format!("attempt_flush_queue not sending"));
let e = Error::with_msg_no_trace(format!("flush queue {id} not sending"));
return Err(e);
}
}
@@ -2132,16 +2104,10 @@ impl Stream for CaConn {
}
if !self.is_shutdown() {
fn abc(
obj: &mut CaConn,
) -> (
&mut VecDeque<QueryItem>,
&mut Pin<Box<SenderPolling<VecDeque<QueryItem>>>>,
) {
(&mut obj.insert_item_queue, &mut obj.storage_insert_sender)
}
let (qu, sp) = abc(self.as_mut().get_mut());
match Self::attempt_flush_queue(qu, sp, Self::send_batched::<32, _>, 32, cx) {
let obj = self.as_mut().get_mut();
let qu = &mut obj.insert_item_queue;
let sp = &mut obj.storage_insert_sender;
match Self::attempt_flush_queue(qu, sp, Self::send_batched::<32, _>, 32, cx, "strg") {
Ok(Ready(Some(()))) => {
have_progress = true;
}
@@ -2151,23 +2117,15 @@ impl Stream for CaConn {
}
Err(e) => break Ready(Some(Err(e))),
}
// match self.as_mut().attempt_flush_storage_queue(cx) {
// Ok(Ready(Some(()))) => {
// have_progress = true;
// }
// Ok(Ready(None)) => {}
// Ok(Pending) => {
// have_pending = true;
// }
// Err(e) => break Ready(Some(Err(e))),
// }
}
let lts3 = Instant::now();
if !self.is_shutdown() {
match self.as_mut().attempt_flush_writer_establish(cx) {
let obj = self.as_mut().get_mut();
let qu = &mut obj.writer_establish_qu;
let sp = &mut obj.writer_establish_tx;
match Self::attempt_flush_queue(qu, sp, Self::send_individual, 32, cx, "wr-est") {
Ok(Ready(Some(()))) => {
have_progress = true;
}

View File

@@ -7,6 +7,7 @@ edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
async-channel = "2.1.1"
futures-util = "0.3.30"
log = { path = "../log" }
err = { path = "../../daqbuffer/crates/err" }
netpod = { path = "../../daqbuffer/crates/netpod" }

View File

@@ -4,6 +4,9 @@ use async_channel::Sender;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use future::ready;
use futures_util::future;
use futures_util::StreamExt;
use log::*;
use netpod::timeunits::HOUR;
use netpod::timeunits::SEC;
@@ -62,7 +65,6 @@ pub struct SeriesWriter {
}
impl SeriesWriter {
// TODO this requires a database
pub async fn establish(
worker_tx: Sender<ChannelInfoQuery>,
backend: String,
@@ -82,6 +84,18 @@ impl SeriesWriter {
worker_tx.send(item).await?;
let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?;
let cssid = ChannelStatusSeriesId::new(res.series.into_inner().id());
Self::establish_with_cssid(worker_tx, cssid, backend, channel, scalar_type, shape, tsnow).await
}
pub async fn establish_with_cssid(
worker_tx: Sender<ChannelInfoQuery>,
cssid: ChannelStatusSeriesId,
backend: String,
channel: String,
scalar_type: ScalarType,
shape: Shape,
tsnow: SystemTime,
) -> Result<Self, Error> {
let (tx, rx) = async_channel::bounded(1);
let item = ChannelInfoQuery {
backend,
@@ -207,22 +221,29 @@ impl EstablishWriterWorker {
}
async fn work(self) {
while let Ok(item) = self.jobrx.recv().await {
// TODO
debug!("got job");
let res = SeriesWriter::establish(
self.worker_tx.clone(),
item.backend,
item.channel,
item.scalar_type,
item.shape,
item.tsnow,
)
self.jobrx
.map(move |item| {
let wtx = self.worker_tx.clone();
async move {
// TODO
debug!("got job");
let res = SeriesWriter::establish(
wtx.clone(),
item.backend,
item.channel,
item.scalar_type,
item.shape,
item.tsnow,
)
.await;
if item.restx.send((item.job_id, res)).await.is_err() {
warn!("can not send writer establish result");
}
}
})
.buffer_unordered(512)
.for_each(|_| future::ready(()))
.await;
if item.restx.send((item.job_id, res)).await.is_err() {
warn!("can not send writer establish result");
}
}
}
}