Messed up

This commit is contained in:
Dominik Werder
2021-04-01 23:11:17 +02:00
parent d776ea01e1
commit 67a7a88dc1
2 changed files with 17 additions and 13 deletions
+16 -12
View File
@@ -10,6 +10,7 @@ use futures_core::Stream;
use futures_util::future::FusedFuture; use futures_util::future::FusedFuture;
use bytes::Bytes; use bytes::Bytes;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Mutex;
pub async fn read_test_1(query: &netpod::AggQuerySingleChannel) -> Result<netpod::BodyStream, Error> { pub async fn read_test_1(query: &netpod::AggQuerySingleChannel) -> Result<netpod::BodyStream, Error> {
let pre = "/data/sf-databuffer/daq_swissfel"; let pre = "/data/sf-databuffer/daq_swissfel";
@@ -146,8 +147,10 @@ impl FusedFuture for Fopen1 {
} }
} }
unsafe impl Send for Fopen1 {}
pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> {
pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> + Send {
use futures_util::{StreamExt, FutureExt, pin_mut, select}; use futures_util::{StreamExt, FutureExt, pin_mut, select};
let mut query = query.clone(); let mut query = query.clone();
async_stream::stream! { async_stream::stream! {
@@ -169,11 +172,11 @@ pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) ->
} }
pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> { pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> + Send {
use futures_util::{StreamExt, FutureExt, pin_mut, select}; use futures_util::{StreamExt, FutureExt, pin_mut, select};
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
let mut query = query.clone(); let mut query = query.clone();
let mut fopen = None; let mut fopen = Mutex::new(None);
let mut file: Option<File> = None; let mut file: Option<File> = None;
let mut file_taken_for_read = false; let mut file_taken_for_read = false;
async_stream::stream! { async_stream::stream! {
@@ -181,14 +184,14 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
let mut i1 = 0; let mut i1 = 0;
loop { loop {
{ {
if fopen.is_none() && file.is_none() && !file_taken_for_read { if fopen.lock().unwrap().is_none() && file.is_none() && !file_taken_for_read {
query.timebin = 18700 + i1; query.timebin = 18700 + i1;
fopen = Some(Fopen1::new(datapath(&query))); fopen.lock().unwrap().replace(Fopen1::new(datapath(&query)));
i1 += 1; i1 += 1;
} }
} }
let blen = query.buffer_size as usize; let blen = query.buffer_size as usize;
if fopen.is_some() { if fopen.lock().unwrap().is_some() {
if file.is_some() { if file.is_some() {
if reading.is_none() { if reading.is_none() {
let mut buf = bytes::BytesMut::with_capacity(blen); let mut buf = bytes::BytesMut::with_capacity(blen);
@@ -201,14 +204,15 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
let a = Box::pin(a); let a = Box::pin(a);
reading = Some(a.fuse()); reading = Some(a.fuse());
} }
let mut fopen3 = fopen.lock().unwrap();
let bufres = select! { let bufres = select! {
// TODO can I avoid the unwraps via matching already above? // TODO can I avoid the unwraps via matching already above?
f = fopen.as_mut().unwrap() => { /*f = fopen3.as_mut().unwrap() => {
fopen = None; fopen3.take();
file = Some(f.unwrap()); file = Some(f.unwrap());
info!("opened next file while also waiting on data read"); info!("opened next file while also waiting on data read");
None None
} }*/
k = reading.as_mut().unwrap() => { k = reading.as_mut().unwrap() => {
//() == k; //() == k;
reading = None; reading = None;
@@ -226,14 +230,14 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
} }
else { else {
// TODO try to avoid this duplicated code: // TODO try to avoid this duplicated code:
select! { /*select! {
// TODO can I avoid the unwraps via matching already above? // TODO can I avoid the unwraps via matching already above?
f = fopen.as_mut().unwrap() => { f = fopen.lock().unwrap().as_mut().unwrap() => {
fopen = None; fopen = None;
file = Some(f.unwrap()); file = Some(f.unwrap());
info!("opened next file"); info!("opened next file");
} }
}; };*/
} }
} }
else if file.is_some() { else if file.is_some() {
+1 -1
View File
@@ -65,7 +65,7 @@ async fn parsed_raw(req: Request<Body>) -> Result<Response<Body>, Error> {
let query: AggQuerySingleChannel = serde_json::from_slice(&bodyslice)?; let query: AggQuerySingleChannel = serde_json::from_slice(&bodyslice)?;
//let q = disk::read_test_1(&query).await?; //let q = disk::read_test_1(&query).await?;
//let s = q.inner; //let s = q.inner;
let s = disk::raw_concat_channel_read_stream(&query); let s = disk::raw_concat_channel_read_stream_try_open_in_background(&query);
let res = response(StatusCode::OK) let res = response(StatusCode::OK)
.body(Body::wrap_stream(s))?; .body(Body::wrap_stream(s))?;
/* /*