diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index e46802d..55cb685 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -369,9 +369,9 @@ pub async fn start_lookup_workers( ), Error, > { - let inp_cap = 128; + let inp_cap = 64; let batch_out_cap = 4; - let timeout = Duration::from_millis(400); + let timeout = Duration::from_millis(100); let (query_tx, query_rx) = async_channel::bounded(inp_cap); let (batch_rx, bjh) = batchtools::batcher::batch(inp_cap, timeout, batch_out_cap, query_rx); let mut jhs = Vec::new(); diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 537e694..661f447 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -184,21 +184,21 @@ enum ChannelError { CreateChanFail(ChannelStatusSeriesId), } -#[derive(Debug)] +#[derive(Debug, Clone)] struct EventedState { ts_last: Instant, recv_count: u64, recv_bytes: u64, } -#[derive(Debug)] +#[derive(Debug, Clone)] enum MonitoringState { FetchSeriesId, AddingEvent(SeriesId), Evented(SeriesId, EventedState), } -#[derive(Debug)] +#[derive(Debug, Clone)] struct CreatedState { cssid: ChannelStatusSeriesId, cid: Cid, @@ -218,28 +218,6 @@ struct CreatedState { info_store_msp_last: u32, } -impl Default for CreatedState { - fn default() -> Self { - Self { - cssid: ChannelStatusSeriesId::new(123123), - cid: Cid(123123), - sid: Sid(123123), - ts_created: Instant::now(), - ts_alive_last: Instant::now(), - state: MonitoringState::FetchSeriesId, - ts_msp_last: 4242, - ts_msp_grid_last: 4242, - inserted_in_ts_msp: 4242, - insert_item_ivl_ema: IntervalEma::new(), - item_recv_ivl_ema: IntervalEma::new(), - insert_recv_ivl_last: Instant::now(), - insert_next_earliest: Instant::now(), - muted_before: Default::default(), - info_store_msp_last: Default::default(), - } - } -} - #[derive(Debug)] struct WritableState { created: CreatedState, @@ -951,10 +929,11 @@ impl CaConn { let proto = self.proto.as_mut().unwrap(); proto.push_out(msg); } - st2.state = MonitoringState::AddingEvent(wr.sid()); - let created = std::mem::replace(st2, Default::default()); - *chst = ChannelState::Writable(WritableState { created, writer: wr }); + *chst = ChannelState::Writable(WritableState { + created: st2.clone(), + writer: wr, + }); Ok(()) } else { warn!("TODO handle_series_lookup_result channel in bad state, reset"); @@ -2019,6 +1998,7 @@ impl CaConn { qu_to_si: FB, loop_max: u32, cx: &mut Context, + id: &str, ) -> Result>, Error> where Q: Unpin, @@ -2033,30 +2013,22 @@ impl CaConn { break; } if !sp.has_sender() { - return Err(Error::with_msg_no_trace("attempt_flush_queue no sender")); + return Err(Error::with_msg_no_trace(format!("flush queue {id} no sender"))); } if sp.is_idle() { if let Some(item) = qu_to_si(qu) { sp.as_mut().send_pin(item); } else { + break; } - // TODO maybe use a generic function which produces the next - // item from a queue: can be a batch! - // if let Some(item) = qu.pop_front() { - // // let sd = self.writer_establish_tx.as_mut(); - // // sp.as_mut().send_pin(item); - // } else { - // // break; - // } } - // let sd = &mut self.writer_establish_tx; if sp.is_sending() { match sp.poll_unpin(cx) { Ready(Ok(())) => { have_progress = true; } Ready(Err(e)) => { - let e = Error::with_msg_no_trace(format!("attempt_flush_queue {e}")); + let e = Error::with_msg_no_trace(format!("flush queue {id} {e}")); return Err(e); } Pending => { @@ -2064,7 +2036,7 @@ impl CaConn { } } } else { - let e = Error::with_msg_no_trace(format!("attempt_flush_queue not sending")); + let e = Error::with_msg_no_trace(format!("flush queue {id} not sending")); return Err(e); } } @@ -2132,16 +2104,10 @@ impl Stream for CaConn { } if !self.is_shutdown() { - fn abc( - obj: &mut CaConn, - ) -> ( - &mut VecDeque, - &mut Pin>>>, - ) { - (&mut obj.insert_item_queue, &mut obj.storage_insert_sender) - } - let (qu, sp) = abc(self.as_mut().get_mut()); - match Self::attempt_flush_queue(qu, sp, Self::send_batched::<32, _>, 32, cx) { + let obj = self.as_mut().get_mut(); + let qu = &mut obj.insert_item_queue; + let sp = &mut obj.storage_insert_sender; + match Self::attempt_flush_queue(qu, sp, Self::send_batched::<32, _>, 32, cx, "strg") { Ok(Ready(Some(()))) => { have_progress = true; } @@ -2151,23 +2117,15 @@ impl Stream for CaConn { } Err(e) => break Ready(Some(Err(e))), } - - // match self.as_mut().attempt_flush_storage_queue(cx) { - // Ok(Ready(Some(()))) => { - // have_progress = true; - // } - // Ok(Ready(None)) => {} - // Ok(Pending) => { - // have_pending = true; - // } - // Err(e) => break Ready(Some(Err(e))), - // } } let lts3 = Instant::now(); if !self.is_shutdown() { - match self.as_mut().attempt_flush_writer_establish(cx) { + let obj = self.as_mut().get_mut(); + let qu = &mut obj.writer_establish_qu; + let sp = &mut obj.writer_establish_tx; + match Self::attempt_flush_queue(qu, sp, Self::send_individual, 32, cx, "wr-est") { Ok(Ready(Some(()))) => { have_progress = true; } diff --git a/serieswriter/Cargo.toml b/serieswriter/Cargo.toml index f67fde5..f6496c0 100644 --- a/serieswriter/Cargo.toml +++ b/serieswriter/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] serde = { version = "1.0", features = ["derive"] } async-channel = "2.1.1" +futures-util = "0.3.30" log = { path = "../log" } err = { path = "../../daqbuffer/crates/err" } netpod = { path = "../../daqbuffer/crates/netpod" } diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index 8d08f50..93de3af 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -4,6 +4,9 @@ use async_channel::Sender; use dbpg::seriesbychannel::ChannelInfoQuery; use err::thiserror; use err::ThisError; +use future::ready; +use futures_util::future; +use futures_util::StreamExt; use log::*; use netpod::timeunits::HOUR; use netpod::timeunits::SEC; @@ -62,7 +65,6 @@ pub struct SeriesWriter { } impl SeriesWriter { - // TODO this requires a database pub async fn establish( worker_tx: Sender, backend: String, @@ -82,6 +84,18 @@ impl SeriesWriter { worker_tx.send(item).await?; let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?; let cssid = ChannelStatusSeriesId::new(res.series.into_inner().id()); + Self::establish_with_cssid(worker_tx, cssid, backend, channel, scalar_type, shape, tsnow).await + } + + pub async fn establish_with_cssid( + worker_tx: Sender, + cssid: ChannelStatusSeriesId, + backend: String, + channel: String, + scalar_type: ScalarType, + shape: Shape, + tsnow: SystemTime, + ) -> Result { let (tx, rx) = async_channel::bounded(1); let item = ChannelInfoQuery { backend, @@ -207,22 +221,29 @@ impl EstablishWriterWorker { } async fn work(self) { - while let Ok(item) = self.jobrx.recv().await { - // TODO - debug!("got job"); - let res = SeriesWriter::establish( - self.worker_tx.clone(), - item.backend, - item.channel, - item.scalar_type, - item.shape, - item.tsnow, - ) + self.jobrx + .map(move |item| { + let wtx = self.worker_tx.clone(); + async move { + // TODO + debug!("got job"); + let res = SeriesWriter::establish( + wtx.clone(), + item.backend, + item.channel, + item.scalar_type, + item.shape, + item.tsnow, + ) + .await; + if item.restx.send((item.job_id, res)).await.is_err() { + warn!("can not send writer establish result"); + } + } + }) + .buffer_unordered(512) + .for_each(|_| future::ready(())) .await; - if item.restx.send((item.job_id, res)).await.is_err() { - warn!("can not send writer establish result"); - } - } } }