It checks

This commit is contained in:
Dominik Werder
2021-04-02 08:11:21 +02:00
parent 67a7a88dc1
commit dc7e39cd26

View File

@@ -176,17 +176,19 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
use futures_util::{StreamExt, FutureExt, pin_mut, select}; use futures_util::{StreamExt, FutureExt, pin_mut, select};
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
let mut query = query.clone(); let mut query = query.clone();
let mut fopen = Mutex::new(None);
let mut file: Option<File> = None;
let mut file_taken_for_read = false;
async_stream::stream! { async_stream::stream! {
let mut fopen = Mutex::new(None);
let mut fopen_avail = false;
let mut file: Option<File> = None;
let mut file_taken_for_read = false;
let mut reading = None; let mut reading = None;
let mut i1 = 0; let mut i1 = 0;
loop { loop {
{ {
if fopen.lock().unwrap().is_none() && file.is_none() && !file_taken_for_read { if !fopen_avail && file.is_none() && !file_taken_for_read {
query.timebin = 18700 + i1; query.timebin = 18700 + i1;
fopen.lock().unwrap().replace(Fopen1::new(datapath(&query))); fopen.lock().unwrap().replace(Fopen1::new(datapath(&query)));
fopen_avail = true;
i1 += 1; i1 += 1;
} }
} }
@@ -204,17 +206,17 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
let a = Box::pin(a); let a = Box::pin(a);
reading = Some(a.fuse()); reading = Some(a.fuse());
} }
let mut fopen3 = fopen.lock().unwrap(); let mut fopen3 = fopen.lock().unwrap().take().unwrap();
let bufres = select! { let bufres = select! {
// TODO can I avoid the unwraps via matching already above? // TODO can I avoid the unwraps via matching already above?
/*f = fopen3.as_mut().unwrap() => { f = fopen3 => {
fopen3.take();
file = Some(f.unwrap());
info!("opened next file while also waiting on data read"); info!("opened next file while also waiting on data read");
fopen_avail = false;
// TODO feed out the potential error:
file = Some(f.unwrap());
None None
}*/ }
k = reading.as_mut().unwrap() => { k = reading.as_mut().unwrap() => {
//() == k;
reading = None; reading = None;
// TODO handle the error somehow here... // TODO handle the error somehow here...
let k = k.unwrap(); let k = k.unwrap();
@@ -224,20 +226,21 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
Some(k.1) Some(k.1)
} }
}; };
if fopen_avail {
fopen.lock().unwrap().replace(fopen3);
}
if let Some(k) = bufres { if let Some(k) = bufres {
yield Ok(k.freeze()); yield Ok(k.freeze());
} }
} }
else { else {
info!("----------------- no file open yet, await only opening of the file");
// TODO try to avoid this duplicated code: // TODO try to avoid this duplicated code:
/*select! { let mut fopen3 = fopen.lock().unwrap().take().unwrap();
// TODO can I avoid the unwraps via matching already above? let f = fopen3.await?;
f = fopen.lock().unwrap().as_mut().unwrap() => { info!("opened next file");
fopen = None; fopen_avail = false;
file = Some(f.unwrap()); file = Some(f);
info!("opened next file");
}
};*/
} }
} }
else if file.is_some() { else if file.is_some() {