From 66f3e2d2c72f9ac9f80bbeffb14328182c7a9ebf Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 2 Apr 2021 10:56:09 +0200 Subject: [PATCH] Remove need for Mutex sync --- disk/src/lib.rs | 28 +++++++++------------------- 1 file changed, 9 insertions(+), 19 deletions(-) diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 286e7e2..aabbb9c 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -10,7 +10,6 @@ use futures_core::Stream; use futures_util::future::FusedFuture; use bytes::Bytes; use std::path::PathBuf; -use std::sync::Mutex; pub async fn read_test_1(query: &netpod::AggQuerySingleChannel) -> Result { let pre = "/data/sf-databuffer/daq_swissfel"; @@ -110,7 +109,6 @@ impl Future for Fopen1 { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use futures_util::{pin_mut}; let g = self.fut.as_mut(); match g.poll(cx) { Poll::Ready(Ok(k)) => { @@ -138,15 +136,14 @@ unsafe impl Send for Fopen1 {} pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel) -> impl Stream> + Send { - use futures_util::{StreamExt, FutureExt, pin_mut, select}; + use futures_util::{FutureExt, select}; use tokio::io::AsyncReadExt; let mut query = query.clone(); async_stream::stream! { - let mut fopen = Mutex::new(None); + let mut fopen = None; let mut fopen_avail = false; let mut file_prep: Option = None; let mut file: Option = None; - let mut file_taken_for_read = false; let mut reading = None; let mut i1 = 0; let mut i9 = 0; @@ -156,7 +153,7 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg if !fopen_avail && file_prep.is_none() && i1 < 16 { query.timebin = 18700 + i1; info!("Prepare open task for next file {}", query.timebin); - fopen.lock().unwrap().replace(Fopen1::new(datapath(&query))); + fopen.replace(Fopen1::new(datapath(&query))); fopen_avail = true; i1 += 1; } @@ -177,16 +174,14 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg } let k = k.unwrap(); reading.take(); - file_taken_for_read = false; file = Some(k.0); yield Ok(k.1.freeze()); } - else if fopen.lock().unwrap().is_some() { + else if fopen.is_some() { if file.is_some() { if reading.is_none() { let mut buf = bytes::BytesMut::with_capacity(blen); let mut file2 = file.take().unwrap(); - file_taken_for_read = true; let a = async move { file2.read_buf(&mut buf).await?; Ok::<_, Error>((file2, buf)) @@ -196,7 +191,7 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg } // TODO do I really have to take out the future while waiting on it? // I think the issue is now with the mutex guard, can I get rid of the mutex again? - let mut fopen3 = fopen.lock().unwrap().take().unwrap(); + let mut fopen3 = fopen.take().unwrap(); let bufres = select! { // TODO can I avoid the unwraps via matching already above? f = fopen3 => { @@ -214,13 +209,11 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg } let k = k.unwrap(); file = Some(k.0); - // TODO must be a nicer way to do this: - file_taken_for_read = false; Some(k.1) } }; if fopen_avail { - fopen.lock().unwrap().replace(fopen3); + fopen.replace(fopen3); } if let Some(k) = bufres { yield Ok(k.freeze()); @@ -229,10 +222,10 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg else { info!("----------------- no file open yet, await only opening of the file"); // TODO try to avoid this duplicated code: - if fopen.lock().unwrap().is_none() { + if fopen.is_none() { error!("logic BB"); } - let mut fopen3 = fopen.lock().unwrap().take().unwrap(); + let fopen3 = fopen.take().unwrap(); let f = fopen3.await?; info!("opened next file SOLO"); fopen_avail = false; @@ -243,10 +236,8 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg loop { let mut buf = bytes::BytesMut::with_capacity(blen); let mut file2 = file.take().unwrap(); - file_taken_for_read = true; let n1 = file2.read_buf(&mut buf).await?; if n1 == 0 { - file_taken_for_read = false; if file_prep.is_some() { file.replace(file_prep.take().unwrap()); } @@ -257,7 +248,6 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg } else { file.replace(file2); - file_taken_for_read = false; yield Ok(buf.freeze()); } } @@ -275,7 +265,7 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> impl Stream> + Send { - use futures_util::{StreamExt, FutureExt, pin_mut, select}; + use futures_util::{StreamExt, pin_mut}; let mut query = query.clone(); async_stream::stream! { let mut i1 = 0;