Associate time binner with channel

This commit is contained in:
Dominik Werder
2023-05-12 13:13:44 +02:00
parent 5216ae7860
commit 3b7b024200

View File

@@ -414,6 +414,7 @@ struct ChannelOpsResources<'a> {
cid_store: &'a mut CidStore,
init_state_count: &'a mut u64,
channel_set_ops_flag: &'a AtomicUsize,
time_binners: &'a mut BTreeMap<Cid, ConnTimeBin>,
}
pub struct CaConn {
@@ -453,7 +454,7 @@ pub struct CaConn {
series_lookup_futs: FuturesUnordered<
Pin<Box<dyn Future<Output = Result<(Cid, u32, u16, u16, Existence<SeriesId>), Error>> + Send>>,
>,
conn_time_bin: ConnTimeBin,
time_binners: BTreeMap<Cid, ConnTimeBin>,
}
impl CaConn {
@@ -506,7 +507,7 @@ impl CaConn {
channel_info_query_tx,
series_lookup_schedule: BTreeMap::new(),
series_lookup_futs: FuturesUnordered::new(),
conn_time_bin: ConnTimeBin::empty(),
time_binners: BTreeMap::new(),
}
}
@@ -699,11 +700,26 @@ impl CaConn {
cid_by_name: &mut BTreeMap<String, Cid>,
name_by_cid: &mut BTreeMap<Cid, String>,
cid_store: &mut CidStore,
time_binners: &mut BTreeMap<Cid, ConnTimeBin>,
) {
let cid = Self::cid_by_name_expl(&channel, cid_by_name, name_by_cid, cid_store);
if channels.contains_key(&cid) {
warn!("TODO actually cause the channel to get closed and removed {}", channel);
}
{
let a: Vec<_> = cid_by_name
.iter()
.filter(|x| x.1 == &cid)
.map(|x| x.0.clone())
.collect();
for x in a {
cid_by_name.remove(&x);
}
}
channels.remove(&cid);
name_by_cid.remove(&cid);
// TODO emit in-progress before drop?
time_binners.remove(&cid);
}
pub fn channel_remove(&mut self, channel: String) {
@@ -713,6 +729,7 @@ impl CaConn {
&mut self.cid_by_name,
&mut self.name_by_cid,
&mut self.cid_store,
&mut self.time_binners,
)
}
@@ -936,7 +953,9 @@ impl CaConn {
// TODO handle error better! Transition channel to Error state?
let scalar_type = ScalarType::from_ca_id(data_type)?;
let shape = Shape::from_ca_count(data_count)?;
self.conn_time_bin.setup_for(series.clone(), &scalar_type, &shape)?;
let mut tb = ConnTimeBin::empty();
tb.setup_for(series.clone(), &scalar_type, &shape)?;
self.time_binners.insert(cid, tb);
let subid = self.subid_store.next();
self.cid_by_subid.insert(subid, cid);
let name = self.name_by_cid(cid).unwrap().to_string();
@@ -1217,7 +1236,11 @@ impl CaConn {
let item_queue = &mut self.insert_item_queue;
let inserts_counter = &mut self.inserts_counter;
let extra_inserts_conf = &self.extra_inserts_conf;
self.conn_time_bin.push(ts, &ev.value)?;
if let Some(tb) = self.time_binners.get_mut(&cid) {
tb.push(ts, &ev.value)?;
} else {
// TODO count or report error
}
#[cfg(DISABLED)]
match &ev.value.data {
CaDataValue::Scalar(x) => match &x {
@@ -1714,9 +1737,14 @@ impl CaConn {
res.cid_store,
res.init_state_count,
),
ChannelSetOp::Remove => {
Self::channel_remove_expl(ch, res.channels, res.cid_by_name, res.name_by_cid, res.cid_store)
}
ChannelSetOp::Remove => Self::channel_remove_expl(
ch,
res.channels,
res.cid_by_name,
res.name_by_cid,
res.cid_store,
res.time_binners,
),
}
}
res.channel_set_ops_flag.store(0, atomic::Ordering::Release);
@@ -1731,14 +1759,17 @@ impl CaConn {
cid_store: &mut self.cid_store,
init_state_count: &mut self.init_state_count,
channel_set_ops_flag: &self.channel_set_ops.flag,
time_binners: &mut self.time_binners,
};
Self::apply_channel_ops_with_res(res)
}
fn handle_own_ticker_tick(self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> {
let this = self.get_mut();
let (obj, insert_item_queue) = { (&mut this.conn_time_bin, &mut this.insert_item_queue) };
obj.tick(insert_item_queue)?;
for (_, tb) in this.time_binners.iter_mut() {
let iiq = &mut this.insert_item_queue;
tb.tick(iiq)?;
}
Ok(())
}
}