Remove need for Mutex sync

This commit is contained in:
Dominik Werder
2021-04-02 10:56:09 +02:00
parent 9d0835b702
commit 66f3e2d2c7

View File

@@ -10,7 +10,6 @@ use futures_core::Stream;
use futures_util::future::FusedFuture; use futures_util::future::FusedFuture;
use bytes::Bytes; use bytes::Bytes;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Mutex;
pub async fn read_test_1(query: &netpod::AggQuerySingleChannel) -> Result<netpod::BodyStream, Error> { pub async fn read_test_1(query: &netpod::AggQuerySingleChannel) -> Result<netpod::BodyStream, Error> {
let pre = "/data/sf-databuffer/daq_swissfel"; let pre = "/data/sf-databuffer/daq_swissfel";
@@ -110,7 +109,6 @@ impl Future for Fopen1 {
type Output = Result<tokio::fs::File, Error>; type Output = Result<tokio::fs::File, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use futures_util::{pin_mut};
let g = self.fut.as_mut(); let g = self.fut.as_mut();
match g.poll(cx) { match g.poll(cx) {
Poll::Ready(Ok(k)) => { Poll::Ready(Ok(k)) => {
@@ -138,15 +136,14 @@ unsafe impl Send for Fopen1 {}
pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> + Send { pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> + Send {
use futures_util::{StreamExt, FutureExt, pin_mut, select}; use futures_util::{FutureExt, select};
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
let mut query = query.clone(); let mut query = query.clone();
async_stream::stream! { async_stream::stream! {
let mut fopen = Mutex::new(None); let mut fopen = None;
let mut fopen_avail = false; let mut fopen_avail = false;
let mut file_prep: Option<File> = None; let mut file_prep: Option<File> = None;
let mut file: Option<File> = None; let mut file: Option<File> = None;
let mut file_taken_for_read = false;
let mut reading = None; let mut reading = None;
let mut i1 = 0; let mut i1 = 0;
let mut i9 = 0; let mut i9 = 0;
@@ -156,7 +153,7 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
if !fopen_avail && file_prep.is_none() && i1 < 16 { if !fopen_avail && file_prep.is_none() && i1 < 16 {
query.timebin = 18700 + i1; query.timebin = 18700 + i1;
info!("Prepare open task for next file {}", query.timebin); info!("Prepare open task for next file {}", query.timebin);
fopen.lock().unwrap().replace(Fopen1::new(datapath(&query))); fopen.replace(Fopen1::new(datapath(&query)));
fopen_avail = true; fopen_avail = true;
i1 += 1; i1 += 1;
} }
@@ -177,16 +174,14 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
} }
let k = k.unwrap(); let k = k.unwrap();
reading.take(); reading.take();
file_taken_for_read = false;
file = Some(k.0); file = Some(k.0);
yield Ok(k.1.freeze()); yield Ok(k.1.freeze());
} }
else if fopen.lock().unwrap().is_some() { else if fopen.is_some() {
if file.is_some() { if file.is_some() {
if reading.is_none() { if reading.is_none() {
let mut buf = bytes::BytesMut::with_capacity(blen); let mut buf = bytes::BytesMut::with_capacity(blen);
let mut file2 = file.take().unwrap(); let mut file2 = file.take().unwrap();
file_taken_for_read = true;
let a = async move { let a = async move {
file2.read_buf(&mut buf).await?; file2.read_buf(&mut buf).await?;
Ok::<_, Error>((file2, buf)) Ok::<_, Error>((file2, buf))
@@ -196,7 +191,7 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
} }
// TODO do I really have to take out the future while waiting on it? // TODO do I really have to take out the future while waiting on it?
// I think the issue is now with the mutex guard, can I get rid of the mutex again? // I think the issue is now with the mutex guard, can I get rid of the mutex again?
let mut fopen3 = fopen.lock().unwrap().take().unwrap(); let mut fopen3 = fopen.take().unwrap();
let bufres = select! { let bufres = select! {
// TODO can I avoid the unwraps via matching already above? // TODO can I avoid the unwraps via matching already above?
f = fopen3 => { f = fopen3 => {
@@ -214,13 +209,11 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
} }
let k = k.unwrap(); let k = k.unwrap();
file = Some(k.0); file = Some(k.0);
// TODO must be a nicer way to do this:
file_taken_for_read = false;
Some(k.1) Some(k.1)
} }
}; };
if fopen_avail { if fopen_avail {
fopen.lock().unwrap().replace(fopen3); fopen.replace(fopen3);
} }
if let Some(k) = bufres { if let Some(k) = bufres {
yield Ok(k.freeze()); yield Ok(k.freeze());
@@ -229,10 +222,10 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
else { else {
info!("----------------- no file open yet, await only opening of the file"); info!("----------------- no file open yet, await only opening of the file");
// TODO try to avoid this duplicated code: // TODO try to avoid this duplicated code:
if fopen.lock().unwrap().is_none() { if fopen.is_none() {
error!("logic BB"); error!("logic BB");
} }
let mut fopen3 = fopen.lock().unwrap().take().unwrap(); let fopen3 = fopen.take().unwrap();
let f = fopen3.await?; let f = fopen3.await?;
info!("opened next file SOLO"); info!("opened next file SOLO");
fopen_avail = false; fopen_avail = false;
@@ -243,10 +236,8 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
loop { loop {
let mut buf = bytes::BytesMut::with_capacity(blen); let mut buf = bytes::BytesMut::with_capacity(blen);
let mut file2 = file.take().unwrap(); let mut file2 = file.take().unwrap();
file_taken_for_read = true;
let n1 = file2.read_buf(&mut buf).await?; let n1 = file2.read_buf(&mut buf).await?;
if n1 == 0 { if n1 == 0 {
file_taken_for_read = false;
if file_prep.is_some() { if file_prep.is_some() {
file.replace(file_prep.take().unwrap()); file.replace(file_prep.take().unwrap());
} }
@@ -257,7 +248,6 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
} }
else { else {
file.replace(file2); file.replace(file2);
file_taken_for_read = false;
yield Ok(buf.freeze()); yield Ok(buf.freeze());
} }
} }
@@ -275,7 +265,7 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> + Send { pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> + Send {
use futures_util::{StreamExt, FutureExt, pin_mut, select}; use futures_util::{StreamExt, pin_mut};
let mut query = query.clone(); let mut query = query.clone();
async_stream::stream! { async_stream::stream! {
let mut i1 = 0; let mut i1 = 0;