From dc7e39cd2675128e08d9ce2ae4e4f9b8b464ad72 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 2 Apr 2021 08:11:21 +0200 Subject: [PATCH] It checks --- disk/src/lib.rs | 39 +++++++++++++++++++++------------------ 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 1e9955c..f2f6880 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -176,17 +176,19 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg use futures_util::{StreamExt, FutureExt, pin_mut, select}; use tokio::io::AsyncReadExt; let mut query = query.clone(); - let mut fopen = Mutex::new(None); - let mut file: Option = None; - let mut file_taken_for_read = false; async_stream::stream! { + let mut fopen = Mutex::new(None); + let mut fopen_avail = false; + let mut file: Option = None; + let mut file_taken_for_read = false; let mut reading = None; let mut i1 = 0; loop { { - if fopen.lock().unwrap().is_none() && file.is_none() && !file_taken_for_read { + if !fopen_avail && file.is_none() && !file_taken_for_read { query.timebin = 18700 + i1; fopen.lock().unwrap().replace(Fopen1::new(datapath(&query))); + fopen_avail = true; i1 += 1; } } @@ -204,17 +206,17 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg let a = Box::pin(a); reading = Some(a.fuse()); } - let mut fopen3 = fopen.lock().unwrap(); + let mut fopen3 = fopen.lock().unwrap().take().unwrap(); let bufres = select! { // TODO can I avoid the unwraps via matching already above? - /*f = fopen3.as_mut().unwrap() => { - fopen3.take(); - file = Some(f.unwrap()); + f = fopen3 => { info!("opened next file while also waiting on data read"); + fopen_avail = false; + // TODO feed out the potential error: + file = Some(f.unwrap()); None - }*/ + } k = reading.as_mut().unwrap() => { - //() == k; reading = None; // TODO handle the error somehow here... let k = k.unwrap(); @@ -224,20 +226,21 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg Some(k.1) } }; + if fopen_avail { + fopen.lock().unwrap().replace(fopen3); + } if let Some(k) = bufres { yield Ok(k.freeze()); } } else { + info!("----------------- no file open yet, await only opening of the file"); // TODO try to avoid this duplicated code: - /*select! { - // TODO can I avoid the unwraps via matching already above? - f = fopen.lock().unwrap().as_mut().unwrap() => { - fopen = None; - file = Some(f.unwrap()); - info!("opened next file"); - } - };*/ + let mut fopen3 = fopen.lock().unwrap().take().unwrap(); + let f = fopen3.await?; + info!("opened next file"); + fopen_avail = false; + file = Some(f); } } else if file.is_some() {