From 3827514900f0dba0bf0a5e9329bd84cd8e6ef914 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 2 May 2024 17:06:05 +0200 Subject: [PATCH] Refactor different queues into common type --- daqingest/src/daemon.rs | 15 +++-- netfetch/src/ca/beacons.rs | 10 ++-- netfetch/src/ca/conn.rs | 92 ++++++++++++++++++------------ netfetch/src/conf.rs | 27 +++++++-- netfetch/src/metrics/postingest.rs | 25 ++++---- netfetch/src/metrics/status.rs | 28 +++++++++ scywr/src/insertqueues.rs | 53 ++++++++++++++++- scywr/src/iteminsertqueue.rs | 17 +++--- serieswriter/src/timebin.rs | 19 +++--- serieswriter/src/writer.rs | 18 +++--- 10 files changed, 214 insertions(+), 90 deletions(-) diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 3b7d7ba..9c7ade4 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -79,12 +79,12 @@ pub struct Daemon { connset_status_last: Instant, // TODO should be a stats object? insert_workers_running: AtomicU64, - query_item_tx_weak: WeakSender>, connset_health_lat_ema: f32, metrics_shutdown_tx: Sender, metrics_shutdown_rx: Receiver, metrics_jh: Option>>, channel_info_query_tx: Sender, + iqtx: Option, } impl Daemon { @@ -143,20 +143,23 @@ impl Daemon { let (st_rf3_tx, st_rf3_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap()); let (st_rf1_tx, st_rf1_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap()); let (mt_rf3_tx, mt_rf3_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap()); + let (lt_rf3_tx, lt_rf3_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap()); let iqtx = InsertQueuesTx { st_rf3_tx, st_rf1_tx, mt_rf3_tx, + lt_rf3_tx, }; let iqrx = InsertQueuesRx { st_rf3_rx, st_rf1_rx, mt_rf3_rx, + lt_rf3_rx, }; (iqtx, iqrx) }; - let query_item_tx_weak = iqtx.st_rf3_tx.clone().downgrade(); + let iqtx2 = iqtx.clone(); let conn_set_ctrl = CaConnSet::start( ingest_opts.backend().into(), @@ -281,12 +284,12 @@ impl Daemon { connset_ctrl: conn_set_ctrl, connset_status_last: Instant::now(), insert_workers_running: AtomicU64::new(0), - query_item_tx_weak, connset_health_lat_ema: 0., metrics_shutdown_tx, metrics_shutdown_rx, metrics_jh: None, channel_info_query_tx, + iqtx: Some(iqtx2), }; Ok(ret) } @@ -584,9 +587,9 @@ impl Daemon { let backend = String::new(); let (_item_tx, item_rx) = async_channel::bounded(256); let info_worker_tx = self.channel_info_query_tx.clone(); - let iiq_tx = self.query_item_tx_weak.upgrade().unwrap(); - let worker_fut = - netfetch::metrics::postingest::process_api_query_items(backend, item_rx, info_worker_tx, iiq_tx); + use netfetch::metrics::postingest::process_api_query_items; + let iqtx = self.iqtx.take().unwrap(); + let worker_fut = process_api_query_items(backend, item_rx, info_worker_tx, iqtx); taskrun::spawn(worker_fut) }; Self::spawn_ticker(self.tx.clone(), self.stats.clone()); diff --git a/netfetch/src/ca/beacons.rs b/netfetch/src/ca/beacons.rs index f5ae990..bb59d19 100644 --- a/netfetch/src/ca/beacons.rs +++ b/netfetch/src/ca/beacons.rs @@ -7,10 +7,10 @@ use log::*; use netpod::ScalarType; use netpod::Shape; use netpod::TsNano; +use scywr::insertqueues::InsertDeques; use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::ScalarValue; use serieswriter::writer::SeriesWriter; -use std::collections::VecDeque; use std::io::Cursor; use std::net::Ipv4Addr; use std::time::SystemTime; @@ -36,7 +36,7 @@ pub async fn listen_beacons( sock.set_broadcast(true).unwrap(); let mut buf = Vec::new(); buf.resize(1024 * 4, 0); - let mut item_qu = VecDeque::new(); + let mut iqdqs = InsertDeques::new(); loop { let bb = &mut buf; let (n, remote) = taskrun::tokio::select! { @@ -65,12 +65,12 @@ pub async fn listen_beacons( let ts_local = ts; let blob = addr_u32 as i64; let val = DataValue::Scalar(ScalarValue::I64(blob)); - writer.write(ts, ts_local, val, &mut item_qu)?; + writer.write(ts, ts_local, val, &mut iqdqs)?; } } - if item_qu.len() != 0 { + if iqdqs.len() != 0 { // TODO deliver to insert queue - item_qu.clear(); + iqdqs.clear(); } } Ok(()) diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 1b0a337..85ed6b1 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -30,6 +30,7 @@ use proto::CaMsgTy; use proto::CaProto; use proto::CreateChan; use proto::EventAdd; +use scywr::insertqueues::InsertDeques; use scywr::iteminsertqueue as scywriiq; use scywr::iteminsertqueue::Accounting; use scywr::iteminsertqueue::DataValue; @@ -758,7 +759,7 @@ pub struct CaConn { channel_status_emit_last: Instant, tick_last_writer: Instant, init_state_count: u64, - insert_item_queue: VecDeque, + iqdqs: InsertDeques, remote_addr_dbg: SocketAddrV4, local_epics_hostname: String, stats: Arc, @@ -824,7 +825,7 @@ impl CaConn { cid_by_sid: HashMap::new(), channel_status_emit_last: tsnow, tick_last_writer: tsnow, - insert_item_queue: VecDeque::new(), + iqdqs: InsertDeques::new(), remote_addr_dbg, local_epics_hostname, stats, @@ -906,7 +907,8 @@ impl CaConn { }; self.channel_state_on_shutdown(channel_reason); let addr = self.remote_addr_dbg.clone(); - self.insert_item_queue + self.iqdqs + .lt_rf3_rx .push_back(QueryItem::ConnectionStatus(ConnectionStatusItem { ts: self.tmp_ts_poll, addr, @@ -1004,7 +1006,7 @@ impl CaConn { cssid: st2.channel.cssid.clone(), status: ChannelStatus::Opened, }); - self.insert_item_queue.push_back(item); + self.iqdqs.lt_rf3_rx.push_back(item); } let name = conf.conf.name(); if name.starts_with("TEST:PEAKING:") { @@ -1171,7 +1173,7 @@ impl CaConn { cssid: cssid.clone(), status: ChannelStatus::Closed(channel_reason.clone()), }); - self.insert_item_queue.push_back(item); + self.iqdqs.lt_rf3_rx.push_back(item); *chst = ChannelState::Ended(cssid); } ChannelState::Error(..) => { @@ -1337,9 +1339,9 @@ impl CaConn { }); let crst = &mut st.channel; let writer = &mut st.writer; - let iiq = &mut self.insert_item_queue; + let iqdqs = &mut self.iqdqs; let stats = self.stats.as_ref(); - Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iiq, tsnow, stnow, stats)?; + Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iqdqs, tsnow, stnow, stats)?; } ReadingState::Monitoring(st2) => { match &mut st2.mon2state { @@ -1353,9 +1355,9 @@ impl CaConn { } let crst = &mut st.channel; let writer = &mut st.writer; - let iiq = &mut self.insert_item_queue; + let iqdqs = &mut self.iqdqs; let stats = self.stats.as_ref(); - Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iiq, tsnow, stnow, stats)?; + Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iqdqs, tsnow, stnow, stats)?; } ReadingState::StopMonitoringForPolling(st2) => { // TODO count for metrics @@ -1483,9 +1485,9 @@ impl CaConn { // TODO maintain histogram of read-notify latencies self.read_ioids.remove(ioid); st2.tick = PollTickState::Idle(tsnow); - let iiq = &mut self.insert_item_queue; + let iqdqs = &mut self.iqdqs; let stats = self.stats.as_ref(); - Self::read_notify_res_for_write(ev, st, iiq, stnow, tsnow, stats)?; + Self::read_notify_res_for_write(ev, st, iqdqs, stnow, tsnow, stats)?; } }, ReadingState::EnableMonitoring(..) => { @@ -1506,9 +1508,9 @@ impl CaConn { } self.read_ioids.remove(&ioid); st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow }); - let iiq = &mut self.insert_item_queue; + let iqdqs = &mut self.iqdqs; let stats = self.stats.as_ref(); - Self::read_notify_res_for_write(ev, st, iiq, stnow, tsnow, stats)?; + Self::read_notify_res_for_write(ev, st, iqdqs, stnow, tsnow, stats)?; } }, ReadingState::StopMonitoringForPolling(..) => { @@ -1531,14 +1533,14 @@ impl CaConn { fn read_notify_res_for_write( ev: proto::ReadNotifyRes, st: &mut WritableState, - iiq: &mut VecDeque, + iqdqs: &mut InsertDeques, stnow: SystemTime, tsnow: Instant, stats: &CaConnStats, ) -> Result<(), Error> { let crst = &mut st.channel; let writer = &mut st.writer; - Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iiq, tsnow, stnow, stats)?; + Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iqdqs, tsnow, stnow, stats)?; Ok(()) } @@ -1547,7 +1549,7 @@ impl CaConn { value: CaEventValue, crst: &mut CreatedState, writer: &mut SeriesWriter, - iiq: &mut VecDeque, + iqdqs: &mut InsertDeques, tsnow: Instant, stnow: SystemTime, stats: &CaConnStats, @@ -1577,7 +1579,7 @@ impl CaConn { Self::check_ev_value_data(&value.data, writer.scalar_type())?; { let val: DataValue = value.data.into(); - writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iiq)?; + writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iqdqs)?; } } if false { @@ -2144,7 +2146,8 @@ impl CaConn { Ok(Ok(tcp)) => { self.stats.tcp_connected.inc(); let addr = addr.clone(); - self.insert_item_queue + self.iqdqs + .lt_rf3_rx .push_back(QueryItem::ConnectionStatus(ConnectionStatusItem { ts: self.tmp_ts_poll, addr, @@ -2164,7 +2167,8 @@ impl CaConn { Ok(Err(e)) => { debug!("error connect to {addr} {e}"); let addr = addr.clone(); - self.insert_item_queue + self.iqdqs + .lt_rf3_rx .push_back(QueryItem::ConnectionStatus(ConnectionStatusItem { ts: self.tmp_ts_poll, addr, @@ -2177,7 +2181,8 @@ impl CaConn { // TODO log with exponential backoff debug!("timeout connect to {addr} {e}"); let addr = addr.clone(); - self.insert_item_queue + self.iqdqs + .lt_rf3_rx .push_back(QueryItem::ConnectionStatus(ConnectionStatusItem { ts: self.tmp_ts_poll, addr, @@ -2237,7 +2242,7 @@ impl CaConn { self.stats.loop2_count.inc(); if self.is_shutdown() { break; - } else if self.insert_item_queue.len() >= self.opts.insert_queue_max { + } else if self.iqdqs.len() >= self.opts.insert_queue_max { break; } else { match self.handle_conn_state(tsnow, cx) { @@ -2365,7 +2370,7 @@ impl CaConn { count, bytes, }); - self.insert_item_queue.push_back(item); + self.iqdqs.lt_rf3_rx.push_back(item); } } } @@ -2379,7 +2384,7 @@ impl CaConn { for (_, chconf) in &mut self.channels { let chst = &mut chconf.state; if let ChannelState::Writable(st2) = chst { - st2.writer.tick(&mut self.insert_item_queue)?; + st2.writer.tick(&mut self.iqdqs)?; } } Ok(()) @@ -2392,13 +2397,11 @@ impl CaConn { fn queues_out_flushed(&self) -> bool { debug!( "async out flushed iiq {} {} caout {}", - self.insert_item_queue.is_empty(), + self.iqdqs.len() == 0, self.storage_insert_sender.is_idle(), self.ca_conn_event_out_queue.is_empty() ); - self.insert_item_queue.is_empty() - && self.storage_insert_sender.is_idle() - && self.ca_conn_event_out_queue.is_empty() + self.iqdqs.len() == 0 && self.storage_insert_sender.is_idle() && self.ca_conn_event_out_queue.is_empty() } fn attempt_flush_queue( @@ -2415,9 +2418,10 @@ impl CaConn { FB: Fn(&mut VecDeque) -> Option, FS: Fn(&Q), { + let self_name = "attempt_flush_queue"; use Poll::*; if qu.len() != 0 { - trace_flush_queue!("attempt_flush_queue id {:7} len {}", id, qu.len()); + trace_flush_queue!("{self_name} id {:10} len {}", id, qu.len()); } let mut have_progress = false; let mut i = 0; @@ -2440,7 +2444,7 @@ impl CaConn { if sp.is_sending() { match sp.poll_unpin(cx) { Ready(Ok(())) => { - trace_flush_queue!("attempt_flush_queue id {:7} send done", id); + trace_flush_queue!("{self_name} id {:10} send done", id); have_progress = true; } Ready(Err(e)) => { @@ -2485,6 +2489,24 @@ macro_rules! flush_queue { }; } +macro_rules! flush_queue_dqs { + ($self:expr, $qu:ident, $sp:ident, $batcher:expr, $loop_max:expr, $have:expr, $id:expr, $cx:expr, $stats:expr) => { + let obj = $self.as_mut().get_mut(); + let qu = &mut obj.iqdqs.$qu; + let sp = &mut obj.$sp; + match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id, $stats) { + Ok(Ready(Some(()))) => { + *$have.0 |= true; + } + Ok(Ready(None)) => {} + Ok(Pending) => { + *$have.1 |= true; + } + Err(e) => break Ready(Some(CaConnEvent::err_now(e))), + } + }; +} + fn send_individual(qu: &mut VecDeque) -> Option { qu.pop_front() } @@ -2514,7 +2536,7 @@ impl Stream for CaConn { let lts1 = Instant::now(); self.stats.poll_loop_begin().inc(); - let qlen = self.insert_item_queue.len(); + let qlen = self.iqdqs.len(); if qlen >= self.opts.insert_queue_max * 2 / 3 { self.stats.insert_item_queue_pressure().inc(); } else if qlen >= self.opts.insert_queue_max { @@ -2543,8 +2565,8 @@ impl Stream for CaConn { } { - let iiq = &self.insert_item_queue; - self.stats.iiq_len().ingest(iiq.len() as u32); + let n = self.iqdqs.len(); + self.stats.iiq_len().ingest(n as u32); } { @@ -2552,14 +2574,14 @@ impl Stream for CaConn { let stats_fn = move |item: &VecDeque| { stats2.iiq_batch_len().ingest(item.len() as u32); }; - flush_queue!( + flush_queue_dqs!( self, - insert_item_queue, + st_rf1_rx, storage_insert_sender, send_batched::<256, _>, 32, (&mut have_progress, &mut have_pending), - "strg", + "iq_st_rf1", cx, stats_fn ); diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index f6876d4..ba550fe 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -246,6 +246,7 @@ async fn parse_channel_config_txt(fname: &Path, re_p: Regex, re_n: Regex) -> Res short_term: Some(ChannelReadConfig::Monitor), medium_term: None, long_term: None, + is_polled: false, }, }; conf.channels.push(item); @@ -274,6 +275,8 @@ pub struct IngestConfigArchiving { #[serde(default, skip_serializing_if = "Option::is_none")] #[serde(with = "serde_option_channel_read_config")] long_term: Option, + #[serde(default, skip_serializing_if = "bool_is_false")] + is_polled: bool, } fn bool_is_false(x: &bool) -> bool { @@ -368,7 +371,7 @@ mod serde_option_channel_read_config { type Value = Option; fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "keyword `Monitor`, an integer, or not this field at all") + write!(fmt, "keyword `Monitor`, keyword `None`, an integer, or missing") } fn visit_str(self, v: &str) -> Result @@ -390,8 +393,9 @@ mod serde_option_channel_read_config { where E: de::Error, { - if v < 1 || v > 108000 { - let e = E::custom(format!("unsupported value {v:?}, polling must be in range 1..108000")); + let max = 108000; + if v < 1 || v > max { + let e = E::custom(format!("unsupported value {v:?}, polling must be in range 1..{max:?}")); return Err(e); } Ok(Some(ChannelReadConfig::Poll(Duration::from_secs(v as u64)))) @@ -401,8 +405,9 @@ mod serde_option_channel_read_config { where E: de::Error, { - if v < 1 || v > 108000 { - let e = E::custom(format!("unsupported value {v:?}, polling must be in range 1..108000")); + let max = 108000; + if v < 1 || v > max { + let e = E::custom(format!("unsupported value {v:?}, polling must be in range 1..{max:?}")); return Err(e); } self.visit_u64(v as u64) @@ -436,6 +441,17 @@ CH-02: short_term: Monitor CH-03: archiving_configuration: +CH-04: + archiving_configuration: + short_term: None + medium_term: None + long_term: 3600 + is_polled: true +CH-05: + archiving_configuration: + short_term: None + medium_term: None + long_term: Monitor "###; let x: BTreeMap = serde_yaml::from_str(inp).unwrap(); assert_eq!( @@ -501,6 +517,7 @@ impl ChannelConfig { short_term: Some(ChannelReadConfig::Monitor), medium_term: None, long_term: None, + is_polled: false, }, } } diff --git a/netfetch/src/metrics/postingest.rs b/netfetch/src/metrics/postingest.rs index 90d18f6..ea361b7 100644 --- a/netfetch/src/metrics/postingest.rs +++ b/netfetch/src/metrics/postingest.rs @@ -7,6 +7,8 @@ use mrucache::mucache::MuCache; use netpod::ScalarType; use netpod::Shape; use netpod::TsNano; +use scywr::insertqueues::InsertDeques; +use scywr::insertqueues::InsertQueuesTx; use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::ScalarValue; @@ -44,11 +46,11 @@ pub async fn process_api_query_items( backend: String, item_rx: Receiver, info_worker_tx: Sender, - iiq_tx: Sender>, + mut iqtx: InsertQueuesTx, ) -> Result<(), Error> { // TODO so far arbitrary upper limit on the number of ad-hoc channels: let mut mucache: MuCache = MuCache::new(2000); - let mut item_qu = VecDeque::new(); + let mut iqdqs = InsertDeques::new(); let mut sw_tick_last = Instant::now(); #[allow(irrefutable_let_patterns)] @@ -56,7 +58,7 @@ pub async fn process_api_query_items( let tsnow = Instant::now(); if tsnow.saturating_duration_since(sw_tick_last) >= Duration::from_millis(5000) { sw_tick_last = tsnow; - tick_writers(mucache.all_ref_mut(), &mut item_qu)?; + tick_writers(mucache.all_ref_mut(), &mut iqdqs)?; } let item = match item { Ok(Ok(item)) => item, @@ -81,26 +83,23 @@ pub async fn process_api_query_items( stnow, ) .await?; - - let sw = &mut sw; - sw.write(item.ts, item.ts, item.val, &mut item_qu)?; - let item = core::mem::replace(&mut item_qu, VecDeque::new()); - iiq_tx.send(item).await?; + sw.write(item.ts, item.ts, item.val, &mut iqdqs)?; + iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?; } - finish_writers(mucache.all_ref_mut(), &mut item_qu)?; + finish_writers(mucache.all_ref_mut(), &mut iqdqs)?; Ok(()) } -fn tick_writers(sws: Vec<&mut SeriesWriter>, iiq: &mut VecDeque) -> Result<(), Error> { +fn tick_writers(sws: Vec<&mut SeriesWriter>, iqdqs: &mut InsertDeques) -> Result<(), Error> { for sw in sws { - sw.tick(iiq)?; + sw.tick(iqdqs)?; } Ok(()) } -fn finish_writers(sws: Vec<&mut SeriesWriter>, iiq: &mut VecDeque) -> Result<(), Error> { +fn finish_writers(sws: Vec<&mut SeriesWriter>, iqdqs: &mut InsertDeques) -> Result<(), Error> { for sw in sws { - sw.tick(iiq)?; + sw.tick(iqdqs)?; } Ok(()) } diff --git a/netfetch/src/metrics/status.rs b/netfetch/src/metrics/status.rs index 6855861..7d0c02c 100644 --- a/netfetch/src/metrics/status.rs +++ b/netfetch/src/metrics/status.rs @@ -18,6 +18,8 @@ struct ChannelState { ioc_address: Option, connection: ConnectionState, archiving_configuration: ChannelConfig, + recv_count: u64, + recv_bytes: u64, } #[derive(Debug, Serialize)] @@ -58,6 +60,8 @@ pub async fn channel_states(params: HashMap, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender { + let recv_count = st6.recv_count.unwrap_or(0); + let recv_bytes = st6.recv_bytes.unwrap_or(0); use crate::ca::conn::ChannelConnectedInfo; match st6.channel_connected_info { ChannelConnectedInfo::Disconnected => { @@ -112,6 +126,8 @@ pub async fn channel_states(params: HashMap, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender>, pub st_rf1_tx: Sender>, + pub st_rf3_tx: Sender>, pub mt_rf3_tx: Sender>, + pub lt_rf3_tx: Sender>, +} + +impl InsertQueuesTx { + /// Send all accumulated batches + pub async fn send_all(&mut self, iqdqs: &mut InsertDeques) -> Result<(), ()> { + // Send each buffer down the corresponding channel + let item = core::mem::replace(&mut iqdqs.st_rf1_rx, VecDeque::new()); + self.st_rf1_tx.send(item).await.map_err(|_| ())?; + let item = core::mem::replace(&mut iqdqs.st_rf3_rx, VecDeque::new()); + self.st_rf3_tx.send(item).await.map_err(|_| ())?; + let item = core::mem::replace(&mut iqdqs.mt_rf3_rx, VecDeque::new()); + self.mt_rf3_tx.send(item).await.map_err(|_| ())?; + let item = core::mem::replace(&mut iqdqs.lt_rf3_rx, VecDeque::new()); + self.lt_rf3_tx.send(item).await.map_err(|_| ())?; + Ok(()) + } } #[derive(Clone)] pub struct InsertQueuesRx { - pub st_rf3_rx: Receiver>, pub st_rf1_rx: Receiver>, + pub st_rf3_rx: Receiver>, pub mt_rf3_rx: Receiver>, + pub lt_rf3_rx: Receiver>, +} + +pub struct InsertDeques { + pub st_rf1_rx: VecDeque, + pub st_rf3_rx: VecDeque, + pub mt_rf3_rx: VecDeque, + pub lt_rf3_rx: VecDeque, +} + +impl InsertDeques { + pub fn new() -> Self { + Self { + st_rf1_rx: VecDeque::new(), + st_rf3_rx: VecDeque::new(), + mt_rf3_rx: VecDeque::new(), + lt_rf3_rx: VecDeque::new(), + } + } + + /// Total number of items cumulated over all queues. + pub fn len(&self) -> usize { + self.st_rf1_rx.len() + self.st_rf3_rx.len() + self.mt_rf3_rx.len() + self.lt_rf3_rx.len() + } + + /// + pub fn clear(&mut self) { + self.st_rf1_rx.clear(); + self.st_rf3_rx.clear(); + self.mt_rf3_rx.clear(); + self.lt_rf3_rx.clear(); + } } diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 4681d25..92e3663 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -345,7 +345,7 @@ impl GetValHelp for DataValue { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum ConnectionStatus { ConnectError, ConnectTimeout, @@ -387,7 +387,7 @@ impl ConnectionStatus { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ConnectionStatusItem { pub ts: SystemTime, pub addr: SocketAddrV4, @@ -409,7 +409,7 @@ pub enum ChannelStatusClosedReason { IoError, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum ChannelStatus { AssignedToAddress, Opened, @@ -477,7 +477,7 @@ pub enum ShutdownReason { IocTimeout, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ChannelStatusItem { pub ts: SystemTime, pub cssid: ChannelStatusSeriesId, @@ -494,7 +494,7 @@ impl ChannelStatusItem { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct InsertItem { pub series: SeriesId, pub ts_msp: TsMs, @@ -519,7 +519,7 @@ impl InsertItem { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct TimeBinSimpleF32 { pub series: SeriesId, pub bin_len_ms: i32, @@ -531,7 +531,8 @@ pub struct TimeBinSimpleF32 { pub avg: f32, } -#[derive(Debug)] +// Needs to be Clone to send it to multiple retention times if required. +#[derive(Debug, Clone)] pub enum QueryItem { ConnectionStatus(ConnectionStatusItem), ChannelStatus(ChannelStatusItem), @@ -540,7 +541,7 @@ pub enum QueryItem { Accounting(Accounting), } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Accounting { pub part: i32, pub ts: TsMs, diff --git a/serieswriter/src/timebin.rs b/serieswriter/src/timebin.rs index ae6cc16..722ba52 100644 --- a/serieswriter/src/timebin.rs +++ b/serieswriter/src/timebin.rs @@ -21,6 +21,7 @@ use netpod::ScalarType; use netpod::Shape; use netpod::TsMs; use netpod::TsNano; +use scywr::insertqueues::InsertDeques; use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::GetValHelp; use scywr::iteminsertqueue::QueryItem; @@ -54,7 +55,7 @@ struct TickParams<'a> { acc: &'a mut Box, tb: &'a mut Box, pc: &'a mut PatchCollect, - iiq: &'a mut VecDeque, + iqdqs: &'a mut InsertDeques, next_coarse: Option<&'a mut EventsDim0TimeBinner>, } @@ -236,7 +237,7 @@ impl ConnTimeBin { f(params) } - pub fn tick(&mut self, insert_item_queue: &mut VecDeque) -> Result<(), Error> { + pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { if !self.did_setup { return Ok(()); } @@ -246,7 +247,7 @@ impl ConnTimeBin { acc: &mut self.acc, tb: self.events_binner.as_mut().unwrap(), pc: &mut self.patch_collect, - iiq: insert_item_queue, + iqdqs, next_coarse: self.next_coarse.as_mut().map(|x| x.as_mut()), }; f(params) @@ -292,7 +293,7 @@ where let acc = params.acc; let tb = params.tb; // let pc = params.pc; - let iiq = params.iiq; + let iqdqs = params.iqdqs; let next = params.next_coarse; if let Some(c) = acc.downcast_mut::>() { if c.len() >= 1 { @@ -301,7 +302,7 @@ where let nbins = tb.bins_ready_count(); if nbins >= 1 { trace!("store bins len {} {:?}", nbins, params.series); - store_bins(params.series.clone(), tb, iiq, next)?; + store_bins(params.series.clone(), tb, iqdqs, next)?; // if let Some(mut bins) = tb.bins_ready() { // //info!("store bins {bins:?}"); // let mut bins = bins.to_simple_bins_f32(); @@ -340,7 +341,7 @@ where fn store_bins( series: SeriesId, tb: &mut Box, - iiq: &mut VecDeque, + iqdqs: &mut InsertDeques, next: Option<&mut EventsDim0TimeBinner>, ) -> Result<(), Error> { if let Some(mut bins) = tb.bins_ready() { @@ -384,7 +385,11 @@ fn store_bins( }; let item = QueryItem::TimeBinSimpleF32(item); trace!("push item B ts1ms {ts1ms} bin_len_ms {bin_len_ms} ts_msp {ts_msp} off {off}"); - iiq.push_back(item); + + // TODO check which RT we want to push into + iqdqs.st_rf3_rx.push_back(item.clone()); + iqdqs.mt_rf3_rx.push_back(item.clone()); + iqdqs.lt_rf3_rx.push_back(item); } } Ok(()) diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index 5a84f7f..eb5e052 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -13,6 +13,7 @@ use netpod::ScalarType; use netpod::SeriesKind; use netpod::Shape; use netpod::TsNano; +use scywr::insertqueues::InsertDeques; use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::InsertItem; use scywr::iteminsertqueue::QueryItem; @@ -149,7 +150,7 @@ impl SeriesWriter { ts: TsNano, ts_local: TsNano, val: DataValue, - item_qu: &mut VecDeque, + iqdqs: &mut InsertDeques, ) -> Result<(), Error> { // TODO compute the binned data here as well and flush completed bins if needed. self.binner.push(ts.clone(), &val)?; @@ -193,7 +194,7 @@ impl SeriesWriter { let item = InsertItem { series: self.sid.clone(), ts_msp: ts_msp.to_ts_ms(), - ts_lsp: ts_lsp, + ts_lsp, msp_bump: ts_msp_changed, pulse: 0, scalar_type: self.scalar_type.clone(), @@ -201,12 +202,13 @@ impl SeriesWriter { val, ts_local: ts_local.to_ts_ms(), }; - item_qu.push_back(QueryItem::Insert(item)); + // TODO decide on the path in the new deques struct + iqdqs.st_rf3_rx.push_back(QueryItem::Insert(item)); Ok(()) } - pub fn tick(&mut self, iiq: &mut VecDeque) -> Result<(), Error> { - self.binner.tick(iiq)?; + pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { + self.binner.tick(iqdqs)?; Ok(()) } } @@ -348,15 +350,13 @@ fn write_00() { let tsnow = SystemTime::now(); let mut writer = SeriesWriter::establish(tx, backend.into(), channel.into(), scalar_type, shape, tsnow).await?; eprintln!("{writer:?}"); - let mut item_queue = VecDeque::new(); - let item_qu = &mut item_queue; + let mut iqdqs = InsertDeques::new(); for i in 0..10 { let ts = TsNano::from_ns(HOUR * 24 + SEC * i); let ts_local = ts.clone(); let val = DataValue::Scalar(scywr::iteminsertqueue::ScalarValue::I16(i as _)); - writer.write(ts, ts_local, val, item_qu)?; + writer.write(ts, ts_local, val, &mut iqdqs)?; } - eprintln!("{item_queue:?}"); Ok::<_, Error>(()) }; taskrun::run(fut).unwrap();