From 9d0835b7023ba50f257414a2aa6e685e6e274f84 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 2 Apr 2021 10:50:25 +0200 Subject: [PATCH] Both simple sequential and pre-open yield the same result --- disk/src/lib.rs | 305 +++++++++++++++++++++++++++--------------------- 1 file changed, 169 insertions(+), 136 deletions(-) diff --git a/disk/src/lib.rs b/disk/src/lib.rs index f2f6880..286e7e2 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -38,7 +38,7 @@ struct FileReader { buffer_size: u32, } -impl futures_core::Stream for FileReader { +impl Stream for FileReader { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -76,58 +76,31 @@ impl futures_core::Stream for FileReader { } -struct Ftmp { - file: Option> + Send>>, - file2: Option> + Send + Unpin>>, -} - -impl Ftmp { - pub fn new() -> Self { - Ftmp { - file: None, - file2: None, - } - } - pub fn open(&mut self, path: PathBuf) { - /*let a1 = async { - let ff = tokio::fs::File::open(path.clone()).fuse(); - futures_util::pin_mut!(ff); - let u: Box> + Send + Unpin> = Box::new(ff); - self.file2.replace(u); - };*/ - //let z = tokio::fs::OpenOptions::new().read(true).open(path); - //let y = Box::new(z); - use futures_util::FutureExt; - self.file.replace(Box::new(tokio::fs::File::open(path).fuse())); - } - pub fn is_empty(&self) -> bool { - todo!() - } -} - - +#[allow(dead_code)] struct Fopen1 { opts: tokio::fs::OpenOptions, - fut: Box>>, + fut: Pin>>>, + term: bool, } impl Fopen1 { pub fn new(path: PathBuf) -> Self { - let fut: Box>> = Box::new(async { + let fut = Box::pin(async { let mut o1 = tokio::fs::OpenOptions::new(); let o2 = o1.read(true); let res = o2.open(path); //() == res; //todo!() res.await - }) as Box>>; - let fut2: Box> = Box::new(async { + }) as Pin>>>; + let _fut2: Box> = Box::new(async { 123 }); Self { opts: tokio::fs::OpenOptions::new(), fut, + term: false, } } @@ -136,20 +109,171 @@ impl Fopen1 { impl Future for Fopen1 { type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - todo!() + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + use futures_util::{pin_mut}; + let g = self.fut.as_mut(); + match g.poll(cx) { + Poll::Ready(Ok(k)) => { + self.term = true; + Poll::Ready(Ok(k)) + }, + Poll::Ready(Err(k)) => { + self.term = true; + Poll::Ready(Err(k.into())) + }, + Poll::Pending => Poll::Pending, + } } + } + impl FusedFuture for Fopen1 { fn is_terminated(&self) -> bool { - todo!() + self.term } } unsafe impl Send for Fopen1 {} +pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel) -> impl Stream> + Send { + use futures_util::{StreamExt, FutureExt, pin_mut, select}; + use tokio::io::AsyncReadExt; + let mut query = query.clone(); + async_stream::stream! { + let mut fopen = Mutex::new(None); + let mut fopen_avail = false; + let mut file_prep: Option = None; + let mut file: Option = None; + let mut file_taken_for_read = false; + let mut reading = None; + let mut i1 = 0; + let mut i9 = 0; + loop { + let blen = query.buffer_size as usize; + { + if !fopen_avail && file_prep.is_none() && i1 < 16 { + query.timebin = 18700 + i1; + info!("Prepare open task for next file {}", query.timebin); + fopen.lock().unwrap().replace(Fopen1::new(datapath(&query))); + fopen_avail = true; + i1 += 1; + } + } + if !fopen_avail && file_prep.is_none() && file.is_none() && reading.is_none() { + info!("Nothing more to do"); + break; + } + // TODO + // When the file is available, I can prepare the next reading. + // But next iteration, the file is not available, but reading is, so I should read! + // I can not simply drop the reading future, that would lose the request. + + if reading.is_some() { + let k: Result<(tokio::fs::File, bytes::BytesMut), Error> = reading.as_mut().unwrap().await; + if k.is_err() { + error!("LONELY READ ERROR"); + } + let k = k.unwrap(); + reading.take(); + file_taken_for_read = false; + file = Some(k.0); + yield Ok(k.1.freeze()); + } + else if fopen.lock().unwrap().is_some() { + if file.is_some() { + if reading.is_none() { + let mut buf = bytes::BytesMut::with_capacity(blen); + let mut file2 = file.take().unwrap(); + file_taken_for_read = true; + let a = async move { + file2.read_buf(&mut buf).await?; + Ok::<_, Error>((file2, buf)) + }; + let a = Box::pin(a); + reading = Some(a.fuse()); + } + // TODO do I really have to take out the future while waiting on it? + // I think the issue is now with the mutex guard, can I get rid of the mutex again? + let mut fopen3 = fopen.lock().unwrap().take().unwrap(); + let bufres = select! { + // TODO can I avoid the unwraps via matching already above? + f = fopen3 => { + fopen_avail = false; + // TODO feed out the potential error: + file_prep = Some(f.unwrap()); + None + } + k = reading.as_mut().unwrap() => { + info!("COMBI read chunk"); + reading = None; + // TODO handle the error somehow here... + if k.is_err() { + error!("READ ERROR IN COMBI"); + } + let k = k.unwrap(); + file = Some(k.0); + // TODO must be a nicer way to do this: + file_taken_for_read = false; + Some(k.1) + } + }; + if fopen_avail { + fopen.lock().unwrap().replace(fopen3); + } + if let Some(k) = bufres { + yield Ok(k.freeze()); + } + } + else { + info!("----------------- no file open yet, await only opening of the file"); + // TODO try to avoid this duplicated code: + if fopen.lock().unwrap().is_none() { + error!("logic BB"); + } + let mut fopen3 = fopen.lock().unwrap().take().unwrap(); + let f = fopen3.await?; + info!("opened next file SOLO"); + fopen_avail = false; + file = Some(f); + } + } + else if file.is_some() { + loop { + let mut buf = bytes::BytesMut::with_capacity(blen); + let mut file2 = file.take().unwrap(); + file_taken_for_read = true; + let n1 = file2.read_buf(&mut buf).await?; + if n1 == 0 { + file_taken_for_read = false; + if file_prep.is_some() { + file.replace(file_prep.take().unwrap()); + } + else { + info!("After read loop, next file not yet ready"); + } + break; + } + else { + file.replace(file2); + file_taken_for_read = false; + yield Ok(buf.freeze()); + } + } + } + i9 += 1; + if i9 > 100 { + break; + } + } + } +} + + +// TODO implement another variant with a dedicated task to feed the opened file queue. + + pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> impl Stream> + Send { use futures_util::{StreamExt, FutureExt, pin_mut, select}; let mut query = query.clone(); @@ -171,104 +295,6 @@ pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> } } - -pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel) -> impl Stream> + Send { - use futures_util::{StreamExt, FutureExt, pin_mut, select}; - use tokio::io::AsyncReadExt; - let mut query = query.clone(); - async_stream::stream! { - let mut fopen = Mutex::new(None); - let mut fopen_avail = false; - let mut file: Option = None; - let mut file_taken_for_read = false; - let mut reading = None; - let mut i1 = 0; - loop { - { - if !fopen_avail && file.is_none() && !file_taken_for_read { - query.timebin = 18700 + i1; - fopen.lock().unwrap().replace(Fopen1::new(datapath(&query))); - fopen_avail = true; - i1 += 1; - } - } - let blen = query.buffer_size as usize; - if fopen.lock().unwrap().is_some() { - if file.is_some() { - if reading.is_none() { - let mut buf = bytes::BytesMut::with_capacity(blen); - let mut file2 = file.take().unwrap(); - file_taken_for_read = true; - let a = async move { - file2.read_buf(&mut buf).await?; - Ok::<_, Error>((file2, buf)) - }; - let a = Box::pin(a); - reading = Some(a.fuse()); - } - let mut fopen3 = fopen.lock().unwrap().take().unwrap(); - let bufres = select! { - // TODO can I avoid the unwraps via matching already above? - f = fopen3 => { - info!("opened next file while also waiting on data read"); - fopen_avail = false; - // TODO feed out the potential error: - file = Some(f.unwrap()); - None - } - k = reading.as_mut().unwrap() => { - reading = None; - // TODO handle the error somehow here... - let k = k.unwrap(); - file = Some(k.0); - // TODO must be a nicer way to do this: - file_taken_for_read = false; - Some(k.1) - } - }; - if fopen_avail { - fopen.lock().unwrap().replace(fopen3); - } - if let Some(k) = bufres { - yield Ok(k.freeze()); - } - } - else { - info!("----------------- no file open yet, await only opening of the file"); - // TODO try to avoid this duplicated code: - let mut fopen3 = fopen.lock().unwrap().take().unwrap(); - let f = fopen3.await?; - info!("opened next file"); - fopen_avail = false; - file = Some(f); - } - } - else if file.is_some() { - info!("start read file in a loop"); - loop { - let mut buf = bytes::BytesMut::with_capacity(blen); - let mut file2 = file.take().unwrap(); - file_taken_for_read = true; - let n1 = file2.read_buf(&mut buf).await?; - if n1 == 0 { - break; - } - else { - yield Ok(buf.freeze()); - } - } - info!("DONE with file in a loop"); - } - } - } -} - -fn datapath(query: &netpod::AggQuerySingleChannel) -> PathBuf { - let pre = "/data/sf-databuffer/daq_swissfel"; - let path = format!("{}/{}_{}/byTime/{}/{:019}/{:010}/{:019}_00000_Data", pre, query.ksprefix, query.keyspace, query.channel.name(), query.timebin, query.split, query.tbsize); - path.into() -} - pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChannel) -> impl Stream> { let query = query.clone(); let pre = "/data/sf-databuffer/daq_swissfel"; @@ -302,6 +328,13 @@ pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChan } +fn datapath(query: &netpod::AggQuerySingleChannel) -> PathBuf { + let pre = "/data/sf-databuffer/daq_swissfel"; + let path = format!("{}/{}_{}/byTime/{}/{:019}/{:010}/{:019}_00000_Data", pre, query.ksprefix, query.keyspace, query.channel.name(), query.timebin, query.split, query.tbsize); + path.into() +} + + pub async fn raw_concat_channel_read(query: &netpod::AggQuerySingleChannel) -> Result { let _reader = RawConcatChannelReader { ksprefix: query.ksprefix.clone(),