diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 7ac0f73..1e9955c 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -10,6 +10,7 @@ use futures_core::Stream; use futures_util::future::FusedFuture; use bytes::Bytes; use std::path::PathBuf; +use std::sync::Mutex; pub async fn read_test_1(query: &netpod::AggQuerySingleChannel) -> Result { let pre = "/data/sf-databuffer/daq_swissfel"; @@ -146,8 +147,10 @@ impl FusedFuture for Fopen1 { } } +unsafe impl Send for Fopen1 {} -pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> impl Stream> { + +pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> impl Stream> + Send { use futures_util::{StreamExt, FutureExt, pin_mut, select}; let mut query = query.clone(); async_stream::stream! { @@ -169,11 +172,11 @@ pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> } -pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel) -> impl Stream> { +pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel) -> impl Stream> + Send { use futures_util::{StreamExt, FutureExt, pin_mut, select}; use tokio::io::AsyncReadExt; let mut query = query.clone(); - let mut fopen = None; + let mut fopen = Mutex::new(None); let mut file: Option = None; let mut file_taken_for_read = false; async_stream::stream! { @@ -181,14 +184,14 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg let mut i1 = 0; loop { { - if fopen.is_none() && file.is_none() && !file_taken_for_read { + if fopen.lock().unwrap().is_none() && file.is_none() && !file_taken_for_read { query.timebin = 18700 + i1; - fopen = Some(Fopen1::new(datapath(&query))); + fopen.lock().unwrap().replace(Fopen1::new(datapath(&query))); i1 += 1; } } let blen = query.buffer_size as usize; - if fopen.is_some() { + if fopen.lock().unwrap().is_some() { if file.is_some() { if reading.is_none() { let mut buf = bytes::BytesMut::with_capacity(blen); @@ -201,14 +204,15 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg let a = Box::pin(a); reading = Some(a.fuse()); } + let mut fopen3 = fopen.lock().unwrap(); let bufres = select! { // TODO can I avoid the unwraps via matching already above? - f = fopen.as_mut().unwrap() => { - fopen = None; + /*f = fopen3.as_mut().unwrap() => { + fopen3.take(); file = Some(f.unwrap()); info!("opened next file while also waiting on data read"); None - } + }*/ k = reading.as_mut().unwrap() => { //() == k; reading = None; @@ -226,14 +230,14 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg } else { // TODO try to avoid this duplicated code: - select! { + /*select! { // TODO can I avoid the unwraps via matching already above? - f = fopen.as_mut().unwrap() => { + f = fopen.lock().unwrap().as_mut().unwrap() => { fopen = None; file = Some(f.unwrap()); info!("opened next file"); } - }; + };*/ } } else if file.is_some() { diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 08ab382..6d7f77f 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -65,7 +65,7 @@ async fn parsed_raw(req: Request) -> Result, Error> { let query: AggQuerySingleChannel = serde_json::from_slice(&bodyslice)?; //let q = disk::read_test_1(&query).await?; //let s = q.inner; - let s = disk::raw_concat_channel_read_stream(&query); + let s = disk::raw_concat_channel_read_stream_try_open_in_background(&query); let res = response(StatusCode::OK) .body(Body::wrap_stream(s))?; /*