Refactor different queues into common type

This commit is contained in:
Dominik Werder
2024-05-02 17:06:05 +02:00
parent 0704ad418c
commit 3827514900
10 changed files with 214 additions and 90 deletions

View File

@@ -79,12 +79,12 @@ pub struct Daemon {
connset_status_last: Instant,
// TODO should be a stats object?
insert_workers_running: AtomicU64,
query_item_tx_weak: WeakSender<VecDeque<QueryItem>>,
connset_health_lat_ema: f32,
metrics_shutdown_tx: Sender<u32>,
metrics_shutdown_rx: Receiver<u32>,
metrics_jh: Option<JoinHandle<Result<(), Error>>>,
channel_info_query_tx: Sender<ChannelInfoQuery>,
iqtx: Option<InsertQueuesTx>,
}
impl Daemon {
@@ -143,20 +143,23 @@ impl Daemon {
let (st_rf3_tx, st_rf3_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap());
let (st_rf1_tx, st_rf1_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap());
let (mt_rf3_tx, mt_rf3_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap());
let (lt_rf3_tx, lt_rf3_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap());
let iqtx = InsertQueuesTx {
st_rf3_tx,
st_rf1_tx,
mt_rf3_tx,
lt_rf3_tx,
};
let iqrx = InsertQueuesRx {
st_rf3_rx,
st_rf1_rx,
mt_rf3_rx,
lt_rf3_rx,
};
(iqtx, iqrx)
};
let query_item_tx_weak = iqtx.st_rf3_tx.clone().downgrade();
let iqtx2 = iqtx.clone();
let conn_set_ctrl = CaConnSet::start(
ingest_opts.backend().into(),
@@ -281,12 +284,12 @@ impl Daemon {
connset_ctrl: conn_set_ctrl,
connset_status_last: Instant::now(),
insert_workers_running: AtomicU64::new(0),
query_item_tx_weak,
connset_health_lat_ema: 0.,
metrics_shutdown_tx,
metrics_shutdown_rx,
metrics_jh: None,
channel_info_query_tx,
iqtx: Some(iqtx2),
};
Ok(ret)
}
@@ -584,9 +587,9 @@ impl Daemon {
let backend = String::new();
let (_item_tx, item_rx) = async_channel::bounded(256);
let info_worker_tx = self.channel_info_query_tx.clone();
let iiq_tx = self.query_item_tx_weak.upgrade().unwrap();
let worker_fut =
netfetch::metrics::postingest::process_api_query_items(backend, item_rx, info_worker_tx, iiq_tx);
use netfetch::metrics::postingest::process_api_query_items;
let iqtx = self.iqtx.take().unwrap();
let worker_fut = process_api_query_items(backend, item_rx, info_worker_tx, iqtx);
taskrun::spawn(worker_fut)
};
Self::spawn_ticker(self.tx.clone(), self.stats.clone());

View File

@@ -7,10 +7,10 @@ use log::*;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::ScalarValue;
use serieswriter::writer::SeriesWriter;
use std::collections::VecDeque;
use std::io::Cursor;
use std::net::Ipv4Addr;
use std::time::SystemTime;
@@ -36,7 +36,7 @@ pub async fn listen_beacons(
sock.set_broadcast(true).unwrap();
let mut buf = Vec::new();
buf.resize(1024 * 4, 0);
let mut item_qu = VecDeque::new();
let mut iqdqs = InsertDeques::new();
loop {
let bb = &mut buf;
let (n, remote) = taskrun::tokio::select! {
@@ -65,12 +65,12 @@ pub async fn listen_beacons(
let ts_local = ts;
let blob = addr_u32 as i64;
let val = DataValue::Scalar(ScalarValue::I64(blob));
writer.write(ts, ts_local, val, &mut item_qu)?;
writer.write(ts, ts_local, val, &mut iqdqs)?;
}
}
if item_qu.len() != 0 {
if iqdqs.len() != 0 {
// TODO deliver to insert queue
item_qu.clear();
iqdqs.clear();
}
}
Ok(())

View File

@@ -30,6 +30,7 @@ use proto::CaMsgTy;
use proto::CaProto;
use proto::CreateChan;
use proto::EventAdd;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue as scywriiq;
use scywr::iteminsertqueue::Accounting;
use scywr::iteminsertqueue::DataValue;
@@ -758,7 +759,7 @@ pub struct CaConn {
channel_status_emit_last: Instant,
tick_last_writer: Instant,
init_state_count: u64,
insert_item_queue: VecDeque<QueryItem>,
iqdqs: InsertDeques,
remote_addr_dbg: SocketAddrV4,
local_epics_hostname: String,
stats: Arc<CaConnStats>,
@@ -824,7 +825,7 @@ impl CaConn {
cid_by_sid: HashMap::new(),
channel_status_emit_last: tsnow,
tick_last_writer: tsnow,
insert_item_queue: VecDeque::new(),
iqdqs: InsertDeques::new(),
remote_addr_dbg,
local_epics_hostname,
stats,
@@ -906,7 +907,8 @@ impl CaConn {
};
self.channel_state_on_shutdown(channel_reason);
let addr = self.remote_addr_dbg.clone();
self.insert_item_queue
self.iqdqs
.lt_rf3_rx
.push_back(QueryItem::ConnectionStatus(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
@@ -1004,7 +1006,7 @@ impl CaConn {
cssid: st2.channel.cssid.clone(),
status: ChannelStatus::Opened,
});
self.insert_item_queue.push_back(item);
self.iqdqs.lt_rf3_rx.push_back(item);
}
let name = conf.conf.name();
if name.starts_with("TEST:PEAKING:") {
@@ -1171,7 +1173,7 @@ impl CaConn {
cssid: cssid.clone(),
status: ChannelStatus::Closed(channel_reason.clone()),
});
self.insert_item_queue.push_back(item);
self.iqdqs.lt_rf3_rx.push_back(item);
*chst = ChannelState::Ended(cssid);
}
ChannelState::Error(..) => {
@@ -1337,9 +1339,9 @@ impl CaConn {
});
let crst = &mut st.channel;
let writer = &mut st.writer;
let iiq = &mut self.insert_item_queue;
let iqdqs = &mut self.iqdqs;
let stats = self.stats.as_ref();
Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iiq, tsnow, stnow, stats)?;
Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iqdqs, tsnow, stnow, stats)?;
}
ReadingState::Monitoring(st2) => {
match &mut st2.mon2state {
@@ -1353,9 +1355,9 @@ impl CaConn {
}
let crst = &mut st.channel;
let writer = &mut st.writer;
let iiq = &mut self.insert_item_queue;
let iqdqs = &mut self.iqdqs;
let stats = self.stats.as_ref();
Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iiq, tsnow, stnow, stats)?;
Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iqdqs, tsnow, stnow, stats)?;
}
ReadingState::StopMonitoringForPolling(st2) => {
// TODO count for metrics
@@ -1483,9 +1485,9 @@ impl CaConn {
// TODO maintain histogram of read-notify latencies
self.read_ioids.remove(ioid);
st2.tick = PollTickState::Idle(tsnow);
let iiq = &mut self.insert_item_queue;
let iqdqs = &mut self.iqdqs;
let stats = self.stats.as_ref();
Self::read_notify_res_for_write(ev, st, iiq, stnow, tsnow, stats)?;
Self::read_notify_res_for_write(ev, st, iqdqs, stnow, tsnow, stats)?;
}
},
ReadingState::EnableMonitoring(..) => {
@@ -1506,9 +1508,9 @@ impl CaConn {
}
self.read_ioids.remove(&ioid);
st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow });
let iiq = &mut self.insert_item_queue;
let iqdqs = &mut self.iqdqs;
let stats = self.stats.as_ref();
Self::read_notify_res_for_write(ev, st, iiq, stnow, tsnow, stats)?;
Self::read_notify_res_for_write(ev, st, iqdqs, stnow, tsnow, stats)?;
}
},
ReadingState::StopMonitoringForPolling(..) => {
@@ -1531,14 +1533,14 @@ impl CaConn {
fn read_notify_res_for_write(
ev: proto::ReadNotifyRes,
st: &mut WritableState,
iiq: &mut VecDeque<QueryItem>,
iqdqs: &mut InsertDeques,
stnow: SystemTime,
tsnow: Instant,
stats: &CaConnStats,
) -> Result<(), Error> {
let crst = &mut st.channel;
let writer = &mut st.writer;
Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iiq, tsnow, stnow, stats)?;
Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iqdqs, tsnow, stnow, stats)?;
Ok(())
}
@@ -1547,7 +1549,7 @@ impl CaConn {
value: CaEventValue,
crst: &mut CreatedState,
writer: &mut SeriesWriter,
iiq: &mut VecDeque<QueryItem>,
iqdqs: &mut InsertDeques,
tsnow: Instant,
stnow: SystemTime,
stats: &CaConnStats,
@@ -1577,7 +1579,7 @@ impl CaConn {
Self::check_ev_value_data(&value.data, writer.scalar_type())?;
{
let val: DataValue = value.data.into();
writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iiq)?;
writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iqdqs)?;
}
}
if false {
@@ -2144,7 +2146,8 @@ impl CaConn {
Ok(Ok(tcp)) => {
self.stats.tcp_connected.inc();
let addr = addr.clone();
self.insert_item_queue
self.iqdqs
.lt_rf3_rx
.push_back(QueryItem::ConnectionStatus(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
@@ -2164,7 +2167,8 @@ impl CaConn {
Ok(Err(e)) => {
debug!("error connect to {addr} {e}");
let addr = addr.clone();
self.insert_item_queue
self.iqdqs
.lt_rf3_rx
.push_back(QueryItem::ConnectionStatus(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
@@ -2177,7 +2181,8 @@ impl CaConn {
// TODO log with exponential backoff
debug!("timeout connect to {addr} {e}");
let addr = addr.clone();
self.insert_item_queue
self.iqdqs
.lt_rf3_rx
.push_back(QueryItem::ConnectionStatus(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
@@ -2237,7 +2242,7 @@ impl CaConn {
self.stats.loop2_count.inc();
if self.is_shutdown() {
break;
} else if self.insert_item_queue.len() >= self.opts.insert_queue_max {
} else if self.iqdqs.len() >= self.opts.insert_queue_max {
break;
} else {
match self.handle_conn_state(tsnow, cx) {
@@ -2365,7 +2370,7 @@ impl CaConn {
count,
bytes,
});
self.insert_item_queue.push_back(item);
self.iqdqs.lt_rf3_rx.push_back(item);
}
}
}
@@ -2379,7 +2384,7 @@ impl CaConn {
for (_, chconf) in &mut self.channels {
let chst = &mut chconf.state;
if let ChannelState::Writable(st2) = chst {
st2.writer.tick(&mut self.insert_item_queue)?;
st2.writer.tick(&mut self.iqdqs)?;
}
}
Ok(())
@@ -2392,13 +2397,11 @@ impl CaConn {
fn queues_out_flushed(&self) -> bool {
debug!(
"async out flushed iiq {} {} caout {}",
self.insert_item_queue.is_empty(),
self.iqdqs.len() == 0,
self.storage_insert_sender.is_idle(),
self.ca_conn_event_out_queue.is_empty()
);
self.insert_item_queue.is_empty()
&& self.storage_insert_sender.is_idle()
&& self.ca_conn_event_out_queue.is_empty()
self.iqdqs.len() == 0 && self.storage_insert_sender.is_idle() && self.ca_conn_event_out_queue.is_empty()
}
fn attempt_flush_queue<T, Q, FB, FS>(
@@ -2415,9 +2418,10 @@ impl CaConn {
FB: Fn(&mut VecDeque<T>) -> Option<Q>,
FS: Fn(&Q),
{
let self_name = "attempt_flush_queue";
use Poll::*;
if qu.len() != 0 {
trace_flush_queue!("attempt_flush_queue id {:7} len {}", id, qu.len());
trace_flush_queue!("{self_name} id {:10} len {}", id, qu.len());
}
let mut have_progress = false;
let mut i = 0;
@@ -2440,7 +2444,7 @@ impl CaConn {
if sp.is_sending() {
match sp.poll_unpin(cx) {
Ready(Ok(())) => {
trace_flush_queue!("attempt_flush_queue id {:7} send done", id);
trace_flush_queue!("{self_name} id {:10} send done", id);
have_progress = true;
}
Ready(Err(e)) => {
@@ -2485,6 +2489,24 @@ macro_rules! flush_queue {
};
}
macro_rules! flush_queue_dqs {
($self:expr, $qu:ident, $sp:ident, $batcher:expr, $loop_max:expr, $have:expr, $id:expr, $cx:expr, $stats:expr) => {
let obj = $self.as_mut().get_mut();
let qu = &mut obj.iqdqs.$qu;
let sp = &mut obj.$sp;
match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id, $stats) {
Ok(Ready(Some(()))) => {
*$have.0 |= true;
}
Ok(Ready(None)) => {}
Ok(Pending) => {
*$have.1 |= true;
}
Err(e) => break Ready(Some(CaConnEvent::err_now(e))),
}
};
}
fn send_individual<T>(qu: &mut VecDeque<T>) -> Option<T> {
qu.pop_front()
}
@@ -2514,7 +2536,7 @@ impl Stream for CaConn {
let lts1 = Instant::now();
self.stats.poll_loop_begin().inc();
let qlen = self.insert_item_queue.len();
let qlen = self.iqdqs.len();
if qlen >= self.opts.insert_queue_max * 2 / 3 {
self.stats.insert_item_queue_pressure().inc();
} else if qlen >= self.opts.insert_queue_max {
@@ -2543,8 +2565,8 @@ impl Stream for CaConn {
}
{
let iiq = &self.insert_item_queue;
self.stats.iiq_len().ingest(iiq.len() as u32);
let n = self.iqdqs.len();
self.stats.iiq_len().ingest(n as u32);
}
{
@@ -2552,14 +2574,14 @@ impl Stream for CaConn {
let stats_fn = move |item: &VecDeque<QueryItem>| {
stats2.iiq_batch_len().ingest(item.len() as u32);
};
flush_queue!(
flush_queue_dqs!(
self,
insert_item_queue,
st_rf1_rx,
storage_insert_sender,
send_batched::<256, _>,
32,
(&mut have_progress, &mut have_pending),
"strg",
"iq_st_rf1",
cx,
stats_fn
);

View File

@@ -246,6 +246,7 @@ async fn parse_channel_config_txt(fname: &Path, re_p: Regex, re_n: Regex) -> Res
short_term: Some(ChannelReadConfig::Monitor),
medium_term: None,
long_term: None,
is_polled: false,
},
};
conf.channels.push(item);
@@ -274,6 +275,8 @@ pub struct IngestConfigArchiving {
#[serde(default, skip_serializing_if = "Option::is_none")]
#[serde(with = "serde_option_channel_read_config")]
long_term: Option<ChannelReadConfig>,
#[serde(default, skip_serializing_if = "bool_is_false")]
is_polled: bool,
}
fn bool_is_false(x: &bool) -> bool {
@@ -368,7 +371,7 @@ mod serde_option_channel_read_config {
type Value = Option<ChannelReadConfig>;
fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "keyword `Monitor`, an integer, or not this field at all")
write!(fmt, "keyword `Monitor`, keyword `None`, an integer, or missing")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
@@ -390,8 +393,9 @@ mod serde_option_channel_read_config {
where
E: de::Error,
{
if v < 1 || v > 108000 {
let e = E::custom(format!("unsupported value {v:?}, polling must be in range 1..108000"));
let max = 108000;
if v < 1 || v > max {
let e = E::custom(format!("unsupported value {v:?}, polling must be in range 1..{max:?}"));
return Err(e);
}
Ok(Some(ChannelReadConfig::Poll(Duration::from_secs(v as u64))))
@@ -401,8 +405,9 @@ mod serde_option_channel_read_config {
where
E: de::Error,
{
if v < 1 || v > 108000 {
let e = E::custom(format!("unsupported value {v:?}, polling must be in range 1..108000"));
let max = 108000;
if v < 1 || v > max {
let e = E::custom(format!("unsupported value {v:?}, polling must be in range 1..{max:?}"));
return Err(e);
}
self.visit_u64(v as u64)
@@ -436,6 +441,17 @@ CH-02:
short_term: Monitor
CH-03:
archiving_configuration:
CH-04:
archiving_configuration:
short_term: None
medium_term: None
long_term: 3600
is_polled: true
CH-05:
archiving_configuration:
short_term: None
medium_term: None
long_term: Monitor
"###;
let x: BTreeMap<String, ChannelConfigParse> = serde_yaml::from_str(inp).unwrap();
assert_eq!(
@@ -501,6 +517,7 @@ impl ChannelConfig {
short_term: Some(ChannelReadConfig::Monitor),
medium_term: None,
long_term: None,
is_polled: false,
},
}
}

View File

@@ -7,6 +7,8 @@ use mrucache::mucache::MuCache;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::insertqueues::InsertQueuesTx;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::ScalarValue;
@@ -44,11 +46,11 @@ pub async fn process_api_query_items(
backend: String,
item_rx: Receiver<EventValueItem>,
info_worker_tx: Sender<ChannelInfoQuery>,
iiq_tx: Sender<VecDeque<QueryItem>>,
mut iqtx: InsertQueuesTx,
) -> Result<(), Error> {
// TODO so far arbitrary upper limit on the number of ad-hoc channels:
let mut mucache: MuCache<String, SeriesWriter> = MuCache::new(2000);
let mut item_qu = VecDeque::new();
let mut iqdqs = InsertDeques::new();
let mut sw_tick_last = Instant::now();
#[allow(irrefutable_let_patterns)]
@@ -56,7 +58,7 @@ pub async fn process_api_query_items(
let tsnow = Instant::now();
if tsnow.saturating_duration_since(sw_tick_last) >= Duration::from_millis(5000) {
sw_tick_last = tsnow;
tick_writers(mucache.all_ref_mut(), &mut item_qu)?;
tick_writers(mucache.all_ref_mut(), &mut iqdqs)?;
}
let item = match item {
Ok(Ok(item)) => item,
@@ -81,26 +83,23 @@ pub async fn process_api_query_items(
stnow,
)
.await?;
let sw = &mut sw;
sw.write(item.ts, item.ts, item.val, &mut item_qu)?;
let item = core::mem::replace(&mut item_qu, VecDeque::new());
iiq_tx.send(item).await?;
sw.write(item.ts, item.ts, item.val, &mut iqdqs)?;
iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?;
}
finish_writers(mucache.all_ref_mut(), &mut item_qu)?;
finish_writers(mucache.all_ref_mut(), &mut iqdqs)?;
Ok(())
}
fn tick_writers(sws: Vec<&mut SeriesWriter>, iiq: &mut VecDeque<QueryItem>) -> Result<(), Error> {
fn tick_writers(sws: Vec<&mut SeriesWriter>, iqdqs: &mut InsertDeques) -> Result<(), Error> {
for sw in sws {
sw.tick(iiq)?;
sw.tick(iqdqs)?;
}
Ok(())
}
fn finish_writers(sws: Vec<&mut SeriesWriter>, iiq: &mut VecDeque<QueryItem>) -> Result<(), Error> {
fn finish_writers(sws: Vec<&mut SeriesWriter>, iqdqs: &mut InsertDeques) -> Result<(), Error> {
for sw in sws {
sw.tick(iiq)?;
sw.tick(iqdqs)?;
}
Ok(())
}

View File

@@ -18,6 +18,8 @@ struct ChannelState {
ioc_address: Option<SocketAddr>,
connection: ConnectionState,
archiving_configuration: ChannelConfig,
recv_count: u64,
recv_bytes: u64,
}
#[derive(Debug, Serialize)]
@@ -58,6 +60,8 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
ioc_address: None,
connection: ConnectionState::Connecting,
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
};
states.channels.insert(k, chst);
}
@@ -66,6 +70,8 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
ioc_address: None,
connection: ConnectionState::Connecting,
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
};
states.channels.insert(k, chst);
}
@@ -77,6 +83,8 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
ioc_address: None,
connection: ConnectionState::Connecting,
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
};
states.channels.insert(k, chst);
}
@@ -88,6 +96,8 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
ioc_address: Some(SocketAddr::V4(addr)),
connection: ConnectionState::Connecting,
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
};
states.channels.insert(k, chst);
}
@@ -99,10 +109,14 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
ioc_address: Some(SocketAddr::V4(addr)),
connection: ConnectionState::Connecting,
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
};
states.channels.insert(k, chst);
}
ConnectionStateValue::ChannelStateInfo(st6) => {
let recv_count = st6.recv_count.unwrap_or(0);
let recv_bytes = st6.recv_bytes.unwrap_or(0);
use crate::ca::conn::ChannelConnectedInfo;
match st6.channel_connected_info {
ChannelConnectedInfo::Disconnected => {
@@ -112,6 +126,8 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
// TODO config is stored in two places
// conf: st6.conf,
archiving_configuration: st1.config,
recv_count,
recv_bytes,
};
states.channels.insert(k, chst);
}
@@ -120,6 +136,8 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
ioc_address: Some(SocketAddr::V4(addr)),
connection: ConnectionState::Connecting,
archiving_configuration: st1.config,
recv_count,
recv_bytes,
};
states.channels.insert(k, chst);
}
@@ -128,6 +146,8 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
ioc_address: Some(SocketAddr::V4(addr)),
connection: ConnectionState::Connected,
archiving_configuration: st1.config,
recv_count,
recv_bytes,
};
states.channels.insert(k, chst);
}
@@ -136,6 +156,8 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
ioc_address: Some(SocketAddr::V4(addr)),
connection: ConnectionState::Error,
archiving_configuration: st1.config,
recv_count,
recv_bytes,
};
states.channels.insert(k, chst);
}
@@ -150,6 +172,8 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
ioc_address: None,
connection: ConnectionState::Connecting,
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
};
states.channels.insert(k, chst);
}
@@ -158,6 +182,8 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
ioc_address: None,
connection: ConnectionState::Unreachable,
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
};
states.channels.insert(k, chst);
}
@@ -166,6 +192,8 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
ioc_address: None,
connection: ConnectionState::Unreachable,
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
};
states.channels.insert(k, chst);
}

View File

@@ -5,14 +5,63 @@ use std::collections::VecDeque;
#[derive(Clone)]
pub struct InsertQueuesTx {
pub st_rf3_tx: Sender<VecDeque<QueryItem>>,
pub st_rf1_tx: Sender<VecDeque<QueryItem>>,
pub st_rf3_tx: Sender<VecDeque<QueryItem>>,
pub mt_rf3_tx: Sender<VecDeque<QueryItem>>,
pub lt_rf3_tx: Sender<VecDeque<QueryItem>>,
}
impl InsertQueuesTx {
/// Send all accumulated batches
pub async fn send_all(&mut self, iqdqs: &mut InsertDeques) -> Result<(), ()> {
// Send each buffer down the corresponding channel
let item = core::mem::replace(&mut iqdqs.st_rf1_rx, VecDeque::new());
self.st_rf1_tx.send(item).await.map_err(|_| ())?;
let item = core::mem::replace(&mut iqdqs.st_rf3_rx, VecDeque::new());
self.st_rf3_tx.send(item).await.map_err(|_| ())?;
let item = core::mem::replace(&mut iqdqs.mt_rf3_rx, VecDeque::new());
self.mt_rf3_tx.send(item).await.map_err(|_| ())?;
let item = core::mem::replace(&mut iqdqs.lt_rf3_rx, VecDeque::new());
self.lt_rf3_tx.send(item).await.map_err(|_| ())?;
Ok(())
}
}
#[derive(Clone)]
pub struct InsertQueuesRx {
pub st_rf3_rx: Receiver<VecDeque<QueryItem>>,
pub st_rf1_rx: Receiver<VecDeque<QueryItem>>,
pub st_rf3_rx: Receiver<VecDeque<QueryItem>>,
pub mt_rf3_rx: Receiver<VecDeque<QueryItem>>,
pub lt_rf3_rx: Receiver<VecDeque<QueryItem>>,
}
pub struct InsertDeques {
pub st_rf1_rx: VecDeque<QueryItem>,
pub st_rf3_rx: VecDeque<QueryItem>,
pub mt_rf3_rx: VecDeque<QueryItem>,
pub lt_rf3_rx: VecDeque<QueryItem>,
}
impl InsertDeques {
pub fn new() -> Self {
Self {
st_rf1_rx: VecDeque::new(),
st_rf3_rx: VecDeque::new(),
mt_rf3_rx: VecDeque::new(),
lt_rf3_rx: VecDeque::new(),
}
}
/// Total number of items cumulated over all queues.
pub fn len(&self) -> usize {
self.st_rf1_rx.len() + self.st_rf3_rx.len() + self.mt_rf3_rx.len() + self.lt_rf3_rx.len()
}
///
pub fn clear(&mut self) {
self.st_rf1_rx.clear();
self.st_rf3_rx.clear();
self.mt_rf3_rx.clear();
self.lt_rf3_rx.clear();
}
}

View File

@@ -345,7 +345,7 @@ impl GetValHelp<f64> for DataValue {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum ConnectionStatus {
ConnectError,
ConnectTimeout,
@@ -387,7 +387,7 @@ impl ConnectionStatus {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ConnectionStatusItem {
pub ts: SystemTime,
pub addr: SocketAddrV4,
@@ -409,7 +409,7 @@ pub enum ChannelStatusClosedReason {
IoError,
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum ChannelStatus {
AssignedToAddress,
Opened,
@@ -477,7 +477,7 @@ pub enum ShutdownReason {
IocTimeout,
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ChannelStatusItem {
pub ts: SystemTime,
pub cssid: ChannelStatusSeriesId,
@@ -494,7 +494,7 @@ impl ChannelStatusItem {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct InsertItem {
pub series: SeriesId,
pub ts_msp: TsMs,
@@ -519,7 +519,7 @@ impl InsertItem {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct TimeBinSimpleF32 {
pub series: SeriesId,
pub bin_len_ms: i32,
@@ -531,7 +531,8 @@ pub struct TimeBinSimpleF32 {
pub avg: f32,
}
#[derive(Debug)]
// Needs to be Clone to send it to multiple retention times if required.
#[derive(Debug, Clone)]
pub enum QueryItem {
ConnectionStatus(ConnectionStatusItem),
ChannelStatus(ChannelStatusItem),
@@ -540,7 +541,7 @@ pub enum QueryItem {
Accounting(Accounting),
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Accounting {
pub part: i32,
pub ts: TsMs,

View File

@@ -21,6 +21,7 @@ use netpod::ScalarType;
use netpod::Shape;
use netpod::TsMs;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::GetValHelp;
use scywr::iteminsertqueue::QueryItem;
@@ -54,7 +55,7 @@ struct TickParams<'a> {
acc: &'a mut Box<dyn Any + Send>,
tb: &'a mut Box<dyn TimeBinner>,
pc: &'a mut PatchCollect,
iiq: &'a mut VecDeque<QueryItem>,
iqdqs: &'a mut InsertDeques,
next_coarse: Option<&'a mut EventsDim0TimeBinner<f32>>,
}
@@ -236,7 +237,7 @@ impl ConnTimeBin {
f(params)
}
pub fn tick(&mut self, insert_item_queue: &mut VecDeque<QueryItem>) -> Result<(), Error> {
pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
if !self.did_setup {
return Ok(());
}
@@ -246,7 +247,7 @@ impl ConnTimeBin {
acc: &mut self.acc,
tb: self.events_binner.as_mut().unwrap(),
pc: &mut self.patch_collect,
iiq: insert_item_queue,
iqdqs,
next_coarse: self.next_coarse.as_mut().map(|x| x.as_mut()),
};
f(params)
@@ -292,7 +293,7 @@ where
let acc = params.acc;
let tb = params.tb;
// let pc = params.pc;
let iiq = params.iiq;
let iqdqs = params.iqdqs;
let next = params.next_coarse;
if let Some(c) = acc.downcast_mut::<EventsDim0<STY>>() {
if c.len() >= 1 {
@@ -301,7 +302,7 @@ where
let nbins = tb.bins_ready_count();
if nbins >= 1 {
trace!("store bins len {} {:?}", nbins, params.series);
store_bins(params.series.clone(), tb, iiq, next)?;
store_bins(params.series.clone(), tb, iqdqs, next)?;
// if let Some(mut bins) = tb.bins_ready() {
// //info!("store bins {bins:?}");
// let mut bins = bins.to_simple_bins_f32();
@@ -340,7 +341,7 @@ where
fn store_bins(
series: SeriesId,
tb: &mut Box<dyn TimeBinner>,
iiq: &mut VecDeque<QueryItem>,
iqdqs: &mut InsertDeques,
next: Option<&mut EventsDim0TimeBinner<f32>>,
) -> Result<(), Error> {
if let Some(mut bins) = tb.bins_ready() {
@@ -384,7 +385,11 @@ fn store_bins(
};
let item = QueryItem::TimeBinSimpleF32(item);
trace!("push item B ts1ms {ts1ms} bin_len_ms {bin_len_ms} ts_msp {ts_msp} off {off}");
iiq.push_back(item);
// TODO check which RT we want to push into
iqdqs.st_rf3_rx.push_back(item.clone());
iqdqs.mt_rf3_rx.push_back(item.clone());
iqdqs.lt_rf3_rx.push_back(item);
}
}
Ok(())

View File

@@ -13,6 +13,7 @@ use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::InsertItem;
use scywr::iteminsertqueue::QueryItem;
@@ -149,7 +150,7 @@ impl SeriesWriter {
ts: TsNano,
ts_local: TsNano,
val: DataValue,
item_qu: &mut VecDeque<QueryItem>,
iqdqs: &mut InsertDeques,
) -> Result<(), Error> {
// TODO compute the binned data here as well and flush completed bins if needed.
self.binner.push(ts.clone(), &val)?;
@@ -193,7 +194,7 @@ impl SeriesWriter {
let item = InsertItem {
series: self.sid.clone(),
ts_msp: ts_msp.to_ts_ms(),
ts_lsp: ts_lsp,
ts_lsp,
msp_bump: ts_msp_changed,
pulse: 0,
scalar_type: self.scalar_type.clone(),
@@ -201,12 +202,13 @@ impl SeriesWriter {
val,
ts_local: ts_local.to_ts_ms(),
};
item_qu.push_back(QueryItem::Insert(item));
// TODO decide on the path in the new deques struct
iqdqs.st_rf3_rx.push_back(QueryItem::Insert(item));
Ok(())
}
pub fn tick(&mut self, iiq: &mut VecDeque<QueryItem>) -> Result<(), Error> {
self.binner.tick(iiq)?;
pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
self.binner.tick(iqdqs)?;
Ok(())
}
}
@@ -348,15 +350,13 @@ fn write_00() {
let tsnow = SystemTime::now();
let mut writer = SeriesWriter::establish(tx, backend.into(), channel.into(), scalar_type, shape, tsnow).await?;
eprintln!("{writer:?}");
let mut item_queue = VecDeque::new();
let item_qu = &mut item_queue;
let mut iqdqs = InsertDeques::new();
for i in 0..10 {
let ts = TsNano::from_ns(HOUR * 24 + SEC * i);
let ts_local = ts.clone();
let val = DataValue::Scalar(scywr::iteminsertqueue::ScalarValue::I16(i as _));
writer.write(ts, ts_local, val, item_qu)?;
writer.write(ts, ts_local, val, &mut iqdqs)?;
}
eprintln!("{item_queue:?}");
Ok::<_, Error>(())
};
taskrun::run(fut).unwrap();