diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 5129c37..1700f42 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -414,6 +414,7 @@ struct ChannelOpsResources<'a> { cid_store: &'a mut CidStore, init_state_count: &'a mut u64, channel_set_ops_flag: &'a AtomicUsize, + time_binners: &'a mut BTreeMap, } pub struct CaConn { @@ -453,7 +454,7 @@ pub struct CaConn { series_lookup_futs: FuturesUnordered< Pin), Error>> + Send>>, >, - conn_time_bin: ConnTimeBin, + time_binners: BTreeMap, } impl CaConn { @@ -506,7 +507,7 @@ impl CaConn { channel_info_query_tx, series_lookup_schedule: BTreeMap::new(), series_lookup_futs: FuturesUnordered::new(), - conn_time_bin: ConnTimeBin::empty(), + time_binners: BTreeMap::new(), } } @@ -699,11 +700,26 @@ impl CaConn { cid_by_name: &mut BTreeMap, name_by_cid: &mut BTreeMap, cid_store: &mut CidStore, + time_binners: &mut BTreeMap, ) { let cid = Self::cid_by_name_expl(&channel, cid_by_name, name_by_cid, cid_store); if channels.contains_key(&cid) { warn!("TODO actually cause the channel to get closed and removed {}", channel); } + { + let a: Vec<_> = cid_by_name + .iter() + .filter(|x| x.1 == &cid) + .map(|x| x.0.clone()) + .collect(); + for x in a { + cid_by_name.remove(&x); + } + } + channels.remove(&cid); + name_by_cid.remove(&cid); + // TODO emit in-progress before drop? + time_binners.remove(&cid); } pub fn channel_remove(&mut self, channel: String) { @@ -713,6 +729,7 @@ impl CaConn { &mut self.cid_by_name, &mut self.name_by_cid, &mut self.cid_store, + &mut self.time_binners, ) } @@ -936,7 +953,9 @@ impl CaConn { // TODO handle error better! Transition channel to Error state? let scalar_type = ScalarType::from_ca_id(data_type)?; let shape = Shape::from_ca_count(data_count)?; - self.conn_time_bin.setup_for(series.clone(), &scalar_type, &shape)?; + let mut tb = ConnTimeBin::empty(); + tb.setup_for(series.clone(), &scalar_type, &shape)?; + self.time_binners.insert(cid, tb); let subid = self.subid_store.next(); self.cid_by_subid.insert(subid, cid); let name = self.name_by_cid(cid).unwrap().to_string(); @@ -1217,7 +1236,11 @@ impl CaConn { let item_queue = &mut self.insert_item_queue; let inserts_counter = &mut self.inserts_counter; let extra_inserts_conf = &self.extra_inserts_conf; - self.conn_time_bin.push(ts, &ev.value)?; + if let Some(tb) = self.time_binners.get_mut(&cid) { + tb.push(ts, &ev.value)?; + } else { + // TODO count or report error + } #[cfg(DISABLED)] match &ev.value.data { CaDataValue::Scalar(x) => match &x { @@ -1714,9 +1737,14 @@ impl CaConn { res.cid_store, res.init_state_count, ), - ChannelSetOp::Remove => { - Self::channel_remove_expl(ch, res.channels, res.cid_by_name, res.name_by_cid, res.cid_store) - } + ChannelSetOp::Remove => Self::channel_remove_expl( + ch, + res.channels, + res.cid_by_name, + res.name_by_cid, + res.cid_store, + res.time_binners, + ), } } res.channel_set_ops_flag.store(0, atomic::Ordering::Release); @@ -1731,14 +1759,17 @@ impl CaConn { cid_store: &mut self.cid_store, init_state_count: &mut self.init_state_count, channel_set_ops_flag: &self.channel_set_ops.flag, + time_binners: &mut self.time_binners, }; Self::apply_channel_ops_with_res(res) } fn handle_own_ticker_tick(self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> { let this = self.get_mut(); - let (obj, insert_item_queue) = { (&mut this.conn_time_bin, &mut this.insert_item_queue) }; - obj.tick(insert_item_queue)?; + for (_, tb) in this.time_binners.iter_mut() { + let iiq = &mut this.insert_item_queue; + tb.tick(iiq)?; + } Ok(()) } }