diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index 96bd0e4..e5c8663 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -87,9 +87,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.62" +version = "0.1.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "689894c2db1ea643a50834b999abf1c110887402542955ff5451dab8f861f9ed" +checksum = "eff18d764974428cf3a9328e23fc5c986f5fbed46e6cd4cdf42544df5d297ec1" dependencies = [ "proc-macro2", "quote", @@ -104,9 +104,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.6.2" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1304eab461cf02bd70b083ed8273388f9724c549b316ba3d1e213ce0e9e7fb7e" +checksum = "e5694b64066a2459918d8074c2ce0d5a88f409431994c2356617c8ae0c4721fc" dependencies = [ "async-trait", "axum-core", @@ -137,9 +137,9 @@ dependencies = [ [[package]] name = "axum-core" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f487e40dc9daee24d8a1779df88522f159a54a980f99cfbe43db0be0bd3444a8" +checksum = "1cae3e661676ffbacb30f1a824089a8c9150e71017f7e1e38f2aa32009188d34" dependencies = [ "async-trait", "bytes", @@ -255,9 +255,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.1.1" +version = "4.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ec7a4128863c188deefe750ac1d1dfe66c236909f845af04beed823638dc1b2" +checksum = "f13b9c79b5d1dd500d20ef541215a6423c75829ef43117e1b4d17fd8af0b5d76" dependencies = [ "bitflags", "clap_derive", @@ -691,9 +691,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.27.0" +version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dec7af912d60cdbd3677c1af9352ebae6fb8394d165568a2234df0fa00f87793" +checksum = "221996f774192f0f718773def8201c4ae31f02616a54ccfc2d358bb0e5cefdec" [[package]] name = "h2" @@ -1157,6 +1157,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nom8" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae01545c9c7fc4486ab7debaf2aad7003ac19431791868fb2e8066df97fad2f8" +dependencies = [ + "memchr", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1209,18 +1218,18 @@ dependencies = [ [[package]] name = "num_enum" -version = "0.5.7" +version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf5395665662ef45796a4ff5486c5d41d29e0c09640af4c5f17fd94ee2c119c9" +checksum = "8d829733185c1ca374f17e52b762f24f535ec625d2cc1f070e34c8a9068f341b" dependencies = [ "num_enum_derive", ] [[package]] name = "num_enum_derive" -version = "0.5.7" +version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b0498641e53dd6ac1a4f22547548caa6864cc4933784319cd1775271c5a46ce" +checksum = "2be1598bf1c313dcdd12092e3f1920f463462525a21b7b4e11b4168353d0123e" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1230,9 +1239,9 @@ dependencies = [ [[package]] name = "object" -version = "0.30.2" +version = "0.30.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b8c786513eb403643f2a88c244c2aaa270ef2153f55094587d0c48a3cf22a83" +checksum = "ea86265d3d3dcb6a27fc51bd29a4bf387fae9d2986b823079d4986af253eb439" dependencies = [ "memchr", ] @@ -1377,13 +1386,12 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro-crate" -version = "1.2.1" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eda0fc3b0fb7c975631757e14d9049da17374063edb6ebbcbc54d880d4fe94e9" +checksum = "66618389e4ec1c7afe67d51a9bf34ff9236480f8d51e7489b7d5ab0303c13f34" dependencies = [ "once_cell", - "thiserror", - "toml", + "toml_edit", ] [[package]] @@ -2072,12 +2080,20 @@ dependencies = [ ] [[package]] -name = "toml" -version = "0.5.11" +name = "toml_datetime" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4f7f0dd8d50a853a531c426359045b1998f04219d88799810762cd4ad314234" +checksum = "4553f467ac8e3d374bc9a177a26801e5d0f9b211aa1673fb137a403afd1c9cf5" + +[[package]] +name = "toml_edit" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "729bfd096e40da9c001f778f5cdecbd2957929a24e10e5883d9392220a751581" dependencies = [ - "serde", + "indexmap", + "nom8", + "toml_datetime", ] [[package]] @@ -2261,9 +2277,9 @@ checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" [[package]] name = "unicode-bidi" -version = "0.3.9" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0046be40136ef78dc325e0edefccf84ccddacd0afcc1ca54103fa3c61bbdab1d" +checksum = "d54675592c1dbefd78cbd98db9bacd89886e1ca50692a0692baefffdeb92dd58" [[package]] name = "unicode-ident" diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 75cad77..c875637 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -254,6 +254,7 @@ pub struct Daemon { ingest_commons: Arc, caconn_last_channel_check: Instant, stats: Arc, + shutting_down: bool, } impl Daemon { @@ -281,8 +282,12 @@ impl Daemon { // Insert queue hook if true { tokio::spawn({ - let rx = common_insert_item_queue.receiver(); - let tx = common_insert_item_queue_2.sender(); + let rx = common_insert_item_queue + .receiver() + .ok_or_else(|| Error::with_msg_no_trace("can not derive receiver for insert queue adapter"))?; + let tx = common_insert_item_queue_2 + .sender() + .ok_or_else(|| Error::with_msg_no_trace("can not derive sender for insert queue adapter"))?; let insert_queue_counter = insert_queue_counter.clone(); async move { let mut printed_last = Instant::now(); @@ -338,6 +343,7 @@ impl Daemon { histo.clear(); } } + info!("insert queue adapter ended"); } }); } @@ -416,6 +422,7 @@ impl Daemon { ingest_commons, caconn_last_channel_check: Instant::now(), stats: Arc::new(DaemonStats::new()), + shutting_down: false, }; Ok(ret) } @@ -673,6 +680,10 @@ impl Daemon { &self.stats } + fn allow_create_new_connections(&self) -> bool { + !self.shutting_down + } + async fn check_chans(&mut self) -> Result<(), Error> { { let tsnow = Instant::now(); @@ -916,6 +927,11 @@ impl Daemon { } async fn handle_timer_tick(&mut self) -> Result<(), Error> { + if self.shutting_down { + let sa1 = self.ingest_commons.insert_item_queue.sender_count(); + let sa2 = self.ingest_commons.insert_item_queue.sender_count_2(); + info!("qu senders A {:?} {:?}", sa1, sa2); + } self.stats.handle_timer_tick_count_inc(); let ts1 = Instant::now(); let tsnow = SystemTime::now(); @@ -1015,32 +1031,38 @@ impl Daemon { for res in ress { if let Some(addr) = &res.addr { self.stats.ioc_search_some_inc(); - let ch = Channel::new(res.channel); - if let Some(st) = self.channel_states.get_mut(&ch) { - if let ChannelStateValue::Active(ActiveChannelState::SearchPending { since, did_send: _ }) = - &st.value - { - let dt = tsnow.duration_since(*since).unwrap(); - if dt > SEARCH_PENDING_TIMEOUT_WARN { + if self.allow_create_new_connections() { + let ch = Channel::new(res.channel); + if let Some(st) = self.channel_states.get_mut(&ch) { + if let ChannelStateValue::Active(ActiveChannelState::SearchPending { + since, + did_send: _, + }) = &st.value + { + let dt = tsnow.duration_since(*since).unwrap(); + if dt > SEARCH_PENDING_TIMEOUT_WARN { + warn!( + " FOUND {:5.0} {:5.0} {addr}", + 1e3 * dt.as_secs_f32(), + 1e3 * res.dt.as_secs_f32() + ); + } + let stnew = ChannelStateValue::Active(ActiveChannelState::WithAddress { + addr: addr.clone(), + state: WithAddressState::Unassigned { assign_at: tsnow }, + }); + st.value = stnew; + } else { warn!( - " FOUND {:5.0} {:5.0} {addr}", - 1e3 * dt.as_secs_f32(), - 1e3 * res.dt.as_secs_f32() + "address found, but state for {ch:?} is not SearchPending: {:?}", + st.value ); } - let stnew = ChannelStateValue::Active(ActiveChannelState::WithAddress { - addr: addr.clone(), - state: WithAddressState::Unassigned { assign_at: tsnow }, - }); - st.value = stnew; } else { - warn!( - "address found, but state for {ch:?} is not SearchPending: {:?}", - st.value - ); + warn!("can not find channel state for {ch:?}"); } } else { - warn!("can not find channel state for {ch:?}"); + // Emit something here? } } else { //debug!("no addr from search in {res:?}"); @@ -1138,6 +1160,20 @@ impl Daemon { } } + async fn handle_shutdown(&mut self) -> Result<(), Error> { + warn!("received shutdown event"); + if self.shutting_down { + info!("already shutting down"); + Ok(()) + } else { + self.shutting_down = true; + self.channel_states.clear(); + self.ca_conn_send_shutdown().await?; + self.ingest_commons.insert_item_queue.drop_sender(); + Ok(()) + } + } + async fn handle_event(&mut self, item: DaemonEvent, ticker_inp_tx: &Sender) -> Result<(), Error> { use DaemonEvent::*; self.stats.events_inc(); @@ -1160,6 +1196,7 @@ impl Daemon { ChannelRemove(ch) => self.handle_channel_remove(ch), SearchDone(item) => self.handle_search_done(item).await, CaConnEvent(addr, item) => self.handle_ca_conn_event(addr, item).await, + Shutdown => self.handle_shutdown().await, }; let dt = ts1.elapsed(); if dt > Duration::from_millis(200) { @@ -1176,6 +1213,16 @@ impl Daemon { async move { loop { tokio::time::sleep(Duration::from_millis(100)).await; + if SIGINT.load(atomic::Ordering::Acquire) != 0 || SIGTERM.load(atomic::Ordering::Acquire) != 0 { + if SHUTDOWN_SENT.load(atomic::Ordering::Acquire) == 0 { + if let Err(e) = tx.send(DaemonEvent::Shutdown).await { + error!("can not send TimerTick {e}"); + break; + } else { + SHUTDOWN_SENT.store(1, atomic::Ordering::Release); + } + } + } if let Err(e) = tx.send(DaemonEvent::TimerTick).await { error!("can not send TimerTick {e}"); break; @@ -1220,6 +1267,7 @@ impl Daemon { static SIGINT: AtomicUsize = AtomicUsize::new(0); static SIGTERM: AtomicUsize = AtomicUsize::new(0); +static SHUTDOWN_SENT: AtomicUsize = AtomicUsize::new(0); fn handler_sigint(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) { SIGINT.store(1, atomic::Ordering::Release); diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 3039aeb..eb1ce2b 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -393,7 +393,6 @@ struct ChannelOpsResources<'a> { pub struct CaConn { state: CaConnState, - shutdown: bool, ticker: Pin>, proto: Option, cid_store: CidStore, @@ -445,7 +444,6 @@ impl CaConn { let (cq_tx, cq_rx) = async_channel::bounded(32); Self { state: CaConnState::Unconnected, - shutdown: false, ticker: Box::pin(tokio::time::sleep(Duration::from_millis(500))), proto: None, cid_store: CidStore::new(), @@ -493,34 +491,20 @@ impl CaConn { self.conn_command_tx.clone() } - fn trigger_shutdown(&mut self) { - self.shutdown = true; - for (_k, v) in self.channels.iter_mut() { - match v { - ChannelState::Init => { - *v = ChannelState::Ended; - } - ChannelState::Creating { .. } => { - *v = ChannelState::Ended; - } - ChannelState::Created(st) => { - if let Some(series) = &st.series { - let item = QueryItem::ChannelStatus(ChannelStatusItem { - ts: SystemTime::now(), - series: series.clone(), - status: ChannelStatus::Closed, - }); - info!("emit status item {item:?}"); - self.insert_item_queue.push_back(item); - } - *v = ChannelState::Ended; - } - ChannelState::Error(_) => {} - ChannelState::Ended => {} - } + fn is_shutdown(&self) -> bool { + if let CaConnState::Shutdown = self.state { + true + } else { + false } } + fn trigger_shutdown(&mut self) { + self.state = CaConnState::Shutdown; + self.proto = None; + let res = self.channel_state_on_shutdown(); + } + fn cmd_check_health(&mut self) { match self.check_channels_alive() { Ok(_) => {} @@ -596,10 +580,6 @@ impl CaConn { fn cmd_shutdown(&mut self) { self.trigger_shutdown(); - let res = self.before_reset_of_channel_state(); - self.state = CaConnState::Shutdown; - self.proto = None; - // TODO return the result } fn cmd_extra_inserts_conf(&mut self, extra_inserts_conf: ExtraInsertsConf) { @@ -737,12 +717,15 @@ impl CaConn { self.conn_backoff = self.conn_backoff_beg; } - fn before_reset_of_channel_state(&mut self) -> Result<(), Error> { + fn before_reset_of_channel_state(&mut self) { trace!("before_reset_of_channel_state channels {}", self.channels.len()); - let mut created = 0; let mut warn_max = 0; - for (_cid, chst) in &self.channels { + for (_cid, chst) in &mut self.channels { match chst { + ChannelState::Init => {} + ChannelState::Creating { .. } => { + *chst = ChannelState::Init; + } ChannelState::Created(st) => { if let Some(series) = &st.series { let item = QueryItem::ChannelStatus(ChannelStatusItem { @@ -750,9 +733,6 @@ impl CaConn { series: series.clone(), status: ChannelStatus::Closed, }); - if created < 10 { - trace!("store {:?}", item); - } self.insert_item_queue.push_back(item); } else { if warn_max < 10 { @@ -760,12 +740,51 @@ impl CaConn { warn_max += 1; } } - created += 1; + *chst = ChannelState::Init; + } + ChannelState::Error(_) => { + *chst = ChannelState::Init; + } + ChannelState::Ended => { + *chst = ChannelState::Init; } - _ => (), } } - Ok(()) + } + + fn channel_state_on_shutdown(&mut self) { + trace!("channel_state_on_shutdown channels {}", self.channels.len()); + let mut warn_max = 0; + for (_cid, chst) in &mut self.channels { + match chst { + ChannelState::Init => { + *chst = ChannelState::Ended; + } + ChannelState::Creating { .. } => { + *chst = ChannelState::Ended; + } + ChannelState::Created(st) => { + if let Some(series) = &st.series { + let item = QueryItem::ChannelStatus(ChannelStatusItem { + ts: SystemTime::now(), + series: series.clone(), + status: ChannelStatus::Closed, + }); + self.insert_item_queue.push_back(item); + } else { + if warn_max < 10 { + debug!("no series for cid {:?}", st.cid); + warn_max += 1; + } + } + *chst = ChannelState::Ended; + } + ChannelState::Error(_) => { + *chst = ChannelState::Ended; + } + ChannelState::Ended => {} + } + } } fn handle_insert_futs(&mut self, cx: &mut Context) -> Poll> { @@ -1491,26 +1510,13 @@ impl CaConn { } Ready(Some(Err(e))) => { error!("CaProto yields error: {e:?} remote {:?}", self.remote_addr_dbg); - // TODO unify this handling with the block below - let reset_res = self.before_reset_of_channel_state(); - self.state = CaConnState::Wait(wait_fut(self.backoff_next())); - self.proto = None; - if let Err(e) = reset_res { - error!("can not destruct channel state before reset {e:?}"); - } + self.trigger_shutdown(); Ready(Some(Err(e))) } Ready(None) => { warn!("handle_peer_ready CaProto is done {:?}", self.remote_addr_dbg); - let reset_res = self.before_reset_of_channel_state(); - self.state = CaConnState::Wait(wait_fut(self.backoff_next())); - self.proto = None; - if let Err(e) = reset_res { - error!("can not destruct channel state before reset {e:?}"); - Ready(Some(Err(e))) - } else { - Ready(None) - } + self.trigger_shutdown(); + Ready(None) } Pending => Pending, }; @@ -1649,7 +1655,7 @@ impl CaConn { if self.insert_item_queue.len() >= self.insert_queue_max { break None; } - if self.shutdown { + if self.is_shutdown() { break None; } } @@ -1722,7 +1728,7 @@ impl Stream for CaConn { i1 += 1; self.stats.caconn_loop1_count_inc(); loop { - if self.shutdown { + if self.is_shutdown() { break; } break match self.handle_conn_command(cx) { @@ -1744,7 +1750,7 @@ impl Stream for CaConn { Ready(_) => {} Pending => break Pending, } - if self.shutdown { + if self.is_shutdown() { if self.insert_item_queue.len() == 0 { trace!("no more items to flush"); if i1 >= 10 { @@ -1757,7 +1763,7 @@ impl Stream for CaConn { if self.insert_item_queue.len() >= self.insert_queue_max { break Pending; } - if !self.shutdown { + if !self.is_shutdown() { if let Some(v) = self.loop_inner(cx) { if i1 >= 10 { break v; @@ -1769,7 +1775,7 @@ impl Stream for CaConn { Ready(_) => self.stats.conn_stream_ready_inc(), Pending => self.stats.conn_stream_pending_inc(), } - if self.shutdown && self.insert_item_queue.len() == 0 { + if self.is_shutdown() && self.insert_item_queue.len() == 0 { Ready(None) } else { match ret { diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index b86b072..9d21a8a 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -266,7 +266,9 @@ impl CaConnSet { local_epics_hostname, array_truncate, insert_queue_max, - insert_item_queue.sender(), + insert_item_queue + .sender() + .ok_or_else(|| Error::with_msg_no_trace("can not derive sender"))?, data_store.clone(), Vec::new(), )?; @@ -302,7 +304,8 @@ impl CaConnSet { ca_conn_ress: &TokMx>, addr: SocketAddrV4, ) -> Result { - warn!("Lock for conn_remove"); + // TODO make this lock-free. + //warn!("Lock for conn_remove"); if let Some(_caconn) = ca_conn_ress.lock().await.remove(&addr) { Ok(true) } else { diff --git a/netfetch/src/daemon_common.rs b/netfetch/src/daemon_common.rs index 556284a..649fb73 100644 --- a/netfetch/src/daemon_common.rs +++ b/netfetch/src/daemon_common.rs @@ -27,6 +27,7 @@ pub enum DaemonEvent { ChannelRemove(Channel), SearchDone(Result, Error>), CaConnEvent(SocketAddrV4, CaConnEvent), + Shutdown, } impl DaemonEvent { @@ -47,6 +48,7 @@ impl DaemonEvent { EndOfStream => format!("CaConnEvent/EndOfStream"), } } + Shutdown => format!("Shutdown"), } } } diff --git a/netfetch/src/insertworker.rs b/netfetch/src/insertworker.rs index 054b6be..2a67d5a 100644 --- a/netfetch/src/insertworker.rs +++ b/netfetch/src/insertworker.rs @@ -61,11 +61,17 @@ pub async fn spawn_scylla_insert_workers( use_rate_limit_queue: bool, ttls: Ttls, ) -> Result>, Error> { - let (q2_tx, q2_rx) = async_channel::bounded(insert_item_queue.receiver().capacity().unwrap_or(20000)); + let (q2_tx, q2_rx) = async_channel::bounded( + insert_item_queue + .receiver() + .map_or(20000, |x| x.capacity().unwrap_or(20000)), + ); { let ingest_commons = ingest_commons.clone(); let stats = store_stats.clone(); - let recv = insert_item_queue.receiver(); + let recv = insert_item_queue + .receiver() + .ok_or_else(|| Error::with_msg_no_trace("can not derive insert queue receiver"))?; let store_stats = store_stats.clone(); let fut = async move { if !use_rate_limit_queue { @@ -128,7 +134,9 @@ pub async fn spawn_scylla_insert_workers( let recv = if use_rate_limit_queue { q2_rx.clone() } else { - insert_item_queue.receiver() + insert_item_queue + .receiver() + .ok_or_else(|| Error::with_msg_no_trace("can not derive receiver"))? }; let ingest_commons = ingest_commons.clone(); let fut = async move { @@ -259,7 +267,7 @@ pub async fn spawn_scylla_insert_workers( } } } - trace!("insert worker has no more messages"); + info!("insert worker {i1} has no more messages"); }; let jh = tokio::spawn(fut); jhs.push(jh); diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 1d5d6ff..c164145 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -267,7 +267,10 @@ pub async fn metrics_agg_task( } } { - let val = ingest_commons.insert_item_queue.receiver().len() as u64; + let val = ingest_commons + .insert_item_queue + .receiver() + .map_or(0, |x| x.len() as u64); agg.store_worker_recv_queue_len.store(val, Ordering::Release); } let mut m = METRICS.lock().unwrap(); diff --git a/netfetch/src/store.rs b/netfetch/src/store.rs index de0f8e6..7d0aadf 100644 --- a/netfetch/src/store.rs +++ b/netfetch/src/store.rs @@ -251,35 +251,39 @@ impl CommonInsertItemQueueSender { } pub struct CommonInsertItemQueue { - sender: async_channel::Sender, + sender: std::sync::Mutex>>, recv: async_channel::Receiver, } impl CommonInsertItemQueue { pub fn new(cap: usize) -> Self { let (tx, rx) = async_channel::bounded(cap); - Self { sender: tx, recv: rx } - } - - pub fn sender(&self) -> CommonInsertItemQueueSender { - CommonInsertItemQueueSender { - sender: self.sender.clone(), + Self { + sender: std::sync::Mutex::new(Some(tx)), + recv: rx, } } - pub fn sender_raw(&self) -> async_channel::Sender { - self.sender.clone() + pub fn sender(&self) -> Option { + match self.sender.lock().unwrap().as_ref() { + Some(sender) => { + let ret = CommonInsertItemQueueSender { sender: sender.clone() }; + Some(ret) + } + None => None, + } } - pub fn receiver(&self) -> async_channel::Receiver { - self.recv.clone() + pub fn receiver(&self) -> Option> { + let ret = self.recv.clone(); + Some(ret) } - pub fn sender_count(&self) -> usize { - self.sender.sender_count() + pub fn sender_count(&self) -> Option { + self.sender.lock().unwrap().as_ref().map(|x| x.sender_count()) } - pub fn sender_count2(&self) -> usize { + pub fn sender_count_2(&self) -> usize { self.recv.sender_count() } @@ -288,8 +292,10 @@ impl CommonInsertItemQueue { } pub fn close(&self) { - self.sender.close(); + self.sender.lock().unwrap().as_ref().map(|x| x.close()); } + + pub fn drop_sender(&self) {} } struct InsParCom {