Both simple sequential and pre-open yield the same result

This commit is contained in:
Dominik Werder
2021-04-02 10:50:25 +02:00
parent dc7e39cd26
commit 9d0835b702

View File

@@ -38,7 +38,7 @@ struct FileReader {
buffer_size: u32,
}
impl futures_core::Stream for FileReader {
impl Stream for FileReader {
type Item = Result<bytes::Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@@ -76,58 +76,31 @@ impl futures_core::Stream for FileReader {
}
struct Ftmp {
file: Option<Box<dyn FusedFuture<Output=Result<tokio::fs::File, std::io::Error>> + Send>>,
file2: Option<Box<dyn FusedFuture<Output=Result<tokio::fs::File, std::io::Error>> + Send + Unpin>>,
}
impl Ftmp {
pub fn new() -> Self {
Ftmp {
file: None,
file2: None,
}
}
pub fn open(&mut self, path: PathBuf) {
/*let a1 = async {
let ff = tokio::fs::File::open(path.clone()).fuse();
futures_util::pin_mut!(ff);
let u: Box<dyn FusedFuture<Output=Result<tokio::fs::File, std::io::Error>> + Send + Unpin> = Box::new(ff);
self.file2.replace(u);
};*/
//let z = tokio::fs::OpenOptions::new().read(true).open(path);
//let y = Box::new(z);
use futures_util::FutureExt;
self.file.replace(Box::new(tokio::fs::File::open(path).fuse()));
}
pub fn is_empty(&self) -> bool {
todo!()
}
}
#[allow(dead_code)]
struct Fopen1 {
opts: tokio::fs::OpenOptions,
fut: Box<dyn Future<Output=Result<tokio::fs::File, std::io::Error>>>,
fut: Pin<Box<dyn Future<Output=Result<tokio::fs::File, std::io::Error>>>>,
term: bool,
}
impl Fopen1 {
pub fn new(path: PathBuf) -> Self {
let fut: Box<dyn Future<Output=std::io::Result<tokio::fs::File>>> = Box::new(async {
let fut = Box::pin(async {
let mut o1 = tokio::fs::OpenOptions::new();
let o2 = o1.read(true);
let res = o2.open(path);
//() == res;
//todo!()
res.await
}) as Box<dyn Future<Output=std::io::Result<tokio::fs::File>>>;
let fut2: Box<dyn Future<Output=u32>> = Box::new(async {
}) as Pin<Box<dyn Future<Output=std::io::Result<tokio::fs::File>>>>;
let _fut2: Box<dyn Future<Output=u32>> = Box::new(async {
123
});
Self {
opts: tokio::fs::OpenOptions::new(),
fut,
term: false,
}
}
@@ -136,20 +109,171 @@ impl Fopen1 {
impl Future for Fopen1 {
type Output = Result<tokio::fs::File, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
todo!()
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use futures_util::{pin_mut};
let g = self.fut.as_mut();
match g.poll(cx) {
Poll::Ready(Ok(k)) => {
self.term = true;
Poll::Ready(Ok(k))
},
Poll::Ready(Err(k)) => {
self.term = true;
Poll::Ready(Err(k.into()))
},
Poll::Pending => Poll::Pending,
}
}
}
impl FusedFuture for Fopen1 {
fn is_terminated(&self) -> bool {
todo!()
self.term
}
}
unsafe impl Send for Fopen1 {}
pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> + Send {
use futures_util::{StreamExt, FutureExt, pin_mut, select};
use tokio::io::AsyncReadExt;
let mut query = query.clone();
async_stream::stream! {
let mut fopen = Mutex::new(None);
let mut fopen_avail = false;
let mut file_prep: Option<File> = None;
let mut file: Option<File> = None;
let mut file_taken_for_read = false;
let mut reading = None;
let mut i1 = 0;
let mut i9 = 0;
loop {
let blen = query.buffer_size as usize;
{
if !fopen_avail && file_prep.is_none() && i1 < 16 {
query.timebin = 18700 + i1;
info!("Prepare open task for next file {}", query.timebin);
fopen.lock().unwrap().replace(Fopen1::new(datapath(&query)));
fopen_avail = true;
i1 += 1;
}
}
if !fopen_avail && file_prep.is_none() && file.is_none() && reading.is_none() {
info!("Nothing more to do");
break;
}
// TODO
// When the file is available, I can prepare the next reading.
// But next iteration, the file is not available, but reading is, so I should read!
// I can not simply drop the reading future, that would lose the request.
if reading.is_some() {
let k: Result<(tokio::fs::File, bytes::BytesMut), Error> = reading.as_mut().unwrap().await;
if k.is_err() {
error!("LONELY READ ERROR");
}
let k = k.unwrap();
reading.take();
file_taken_for_read = false;
file = Some(k.0);
yield Ok(k.1.freeze());
}
else if fopen.lock().unwrap().is_some() {
if file.is_some() {
if reading.is_none() {
let mut buf = bytes::BytesMut::with_capacity(blen);
let mut file2 = file.take().unwrap();
file_taken_for_read = true;
let a = async move {
file2.read_buf(&mut buf).await?;
Ok::<_, Error>((file2, buf))
};
let a = Box::pin(a);
reading = Some(a.fuse());
}
// TODO do I really have to take out the future while waiting on it?
// I think the issue is now with the mutex guard, can I get rid of the mutex again?
let mut fopen3 = fopen.lock().unwrap().take().unwrap();
let bufres = select! {
// TODO can I avoid the unwraps via matching already above?
f = fopen3 => {
fopen_avail = false;
// TODO feed out the potential error:
file_prep = Some(f.unwrap());
None
}
k = reading.as_mut().unwrap() => {
info!("COMBI read chunk");
reading = None;
// TODO handle the error somehow here...
if k.is_err() {
error!("READ ERROR IN COMBI");
}
let k = k.unwrap();
file = Some(k.0);
// TODO must be a nicer way to do this:
file_taken_for_read = false;
Some(k.1)
}
};
if fopen_avail {
fopen.lock().unwrap().replace(fopen3);
}
if let Some(k) = bufres {
yield Ok(k.freeze());
}
}
else {
info!("----------------- no file open yet, await only opening of the file");
// TODO try to avoid this duplicated code:
if fopen.lock().unwrap().is_none() {
error!("logic BB");
}
let mut fopen3 = fopen.lock().unwrap().take().unwrap();
let f = fopen3.await?;
info!("opened next file SOLO");
fopen_avail = false;
file = Some(f);
}
}
else if file.is_some() {
loop {
let mut buf = bytes::BytesMut::with_capacity(blen);
let mut file2 = file.take().unwrap();
file_taken_for_read = true;
let n1 = file2.read_buf(&mut buf).await?;
if n1 == 0 {
file_taken_for_read = false;
if file_prep.is_some() {
file.replace(file_prep.take().unwrap());
}
else {
info!("After read loop, next file not yet ready");
}
break;
}
else {
file.replace(file2);
file_taken_for_read = false;
yield Ok(buf.freeze());
}
}
}
i9 += 1;
if i9 > 100 {
break;
}
}
}
}
// TODO implement another variant with a dedicated task to feed the opened file queue.
pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> + Send {
use futures_util::{StreamExt, FutureExt, pin_mut, select};
let mut query = query.clone();
@@ -171,104 +295,6 @@ pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) ->
}
}
pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> + Send {
use futures_util::{StreamExt, FutureExt, pin_mut, select};
use tokio::io::AsyncReadExt;
let mut query = query.clone();
async_stream::stream! {
let mut fopen = Mutex::new(None);
let mut fopen_avail = false;
let mut file: Option<File> = None;
let mut file_taken_for_read = false;
let mut reading = None;
let mut i1 = 0;
loop {
{
if !fopen_avail && file.is_none() && !file_taken_for_read {
query.timebin = 18700 + i1;
fopen.lock().unwrap().replace(Fopen1::new(datapath(&query)));
fopen_avail = true;
i1 += 1;
}
}
let blen = query.buffer_size as usize;
if fopen.lock().unwrap().is_some() {
if file.is_some() {
if reading.is_none() {
let mut buf = bytes::BytesMut::with_capacity(blen);
let mut file2 = file.take().unwrap();
file_taken_for_read = true;
let a = async move {
file2.read_buf(&mut buf).await?;
Ok::<_, Error>((file2, buf))
};
let a = Box::pin(a);
reading = Some(a.fuse());
}
let mut fopen3 = fopen.lock().unwrap().take().unwrap();
let bufres = select! {
// TODO can I avoid the unwraps via matching already above?
f = fopen3 => {
info!("opened next file while also waiting on data read");
fopen_avail = false;
// TODO feed out the potential error:
file = Some(f.unwrap());
None
}
k = reading.as_mut().unwrap() => {
reading = None;
// TODO handle the error somehow here...
let k = k.unwrap();
file = Some(k.0);
// TODO must be a nicer way to do this:
file_taken_for_read = false;
Some(k.1)
}
};
if fopen_avail {
fopen.lock().unwrap().replace(fopen3);
}
if let Some(k) = bufres {
yield Ok(k.freeze());
}
}
else {
info!("----------------- no file open yet, await only opening of the file");
// TODO try to avoid this duplicated code:
let mut fopen3 = fopen.lock().unwrap().take().unwrap();
let f = fopen3.await?;
info!("opened next file");
fopen_avail = false;
file = Some(f);
}
}
else if file.is_some() {
info!("start read file in a loop");
loop {
let mut buf = bytes::BytesMut::with_capacity(blen);
let mut file2 = file.take().unwrap();
file_taken_for_read = true;
let n1 = file2.read_buf(&mut buf).await?;
if n1 == 0 {
break;
}
else {
yield Ok(buf.freeze());
}
}
info!("DONE with file in a loop");
}
}
}
}
fn datapath(query: &netpod::AggQuerySingleChannel) -> PathBuf {
let pre = "/data/sf-databuffer/daq_swissfel";
let path = format!("{}/{}_{}/byTime/{}/{:019}/{:010}/{:019}_00000_Data", pre, query.ksprefix, query.keyspace, query.channel.name(), query.timebin, query.split, query.tbsize);
path.into()
}
pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> {
let query = query.clone();
let pre = "/data/sf-databuffer/daq_swissfel";
@@ -302,6 +328,13 @@ pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChan
}
fn datapath(query: &netpod::AggQuerySingleChannel) -> PathBuf {
let pre = "/data/sf-databuffer/daq_swissfel";
let path = format!("{}/{}_{}/byTime/{}/{:019}/{:010}/{:019}_00000_Data", pre, query.ksprefix, query.keyspace, query.channel.name(), query.timebin, query.split, query.tbsize);
path.into()
}
pub async fn raw_concat_channel_read(query: &netpod::AggQuerySingleChannel) -> Result<netpod::BodyStream, Error> {
let _reader = RawConcatChannelReader {
ksprefix: query.ksprefix.clone(),