From d776ea01e1346149a1ecab49a3a855cd4dd582f5 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 1 Apr 2021 22:58:01 +0200 Subject: [PATCH] WIP on read --- disk/src/lib.rs | 149 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 121 insertions(+), 28 deletions(-) diff --git a/disk/src/lib.rs b/disk/src/lib.rs index d4f7ab1..7ac0f73 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -4,6 +4,7 @@ use err::Error; use std::task::{Context, Poll}; use std::pin::Pin; use tokio::io::AsyncRead; +use tokio::fs::File; use std::future::Future; use futures_core::Stream; use futures_util::future::FusedFuture; @@ -87,12 +88,12 @@ impl Ftmp { } } pub fn open(&mut self, path: PathBuf) { - /*{ + /*let a1 = async { let ff = tokio::fs::File::open(path.clone()).fuse(); futures_util::pin_mut!(ff); let u: Box> + Send + Unpin> = Box::new(ff); self.file2.replace(u); - }*/ + };*/ //let z = tokio::fs::OpenOptions::new().read(true).open(path); //let y = Box::new(z); use futures_util::FutureExt; @@ -104,6 +105,48 @@ impl Ftmp { } +struct Fopen1 { + opts: tokio::fs::OpenOptions, + fut: Box>>, +} + +impl Fopen1 { + + pub fn new(path: PathBuf) -> Self { + let fut: Box>> = Box::new(async { + let mut o1 = tokio::fs::OpenOptions::new(); + let o2 = o1.read(true); + let res = o2.open(path); + //() == res; + //todo!() + res.await + }) as Box>>; + let fut2: Box> = Box::new(async { + 123 + }); + Self { + opts: tokio::fs::OpenOptions::new(), + fut, + } + } + +} + +impl Future for Fopen1 { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + todo!() + } +} + +impl FusedFuture for Fopen1 { + fn is_terminated(&self) -> bool { + todo!() + } +} + + pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> impl Stream> { use futures_util::{StreamExt, FutureExt, pin_mut, select}; let mut query = query.clone(); @@ -128,37 +171,87 @@ pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel) -> impl Stream> { use futures_util::{StreamExt, FutureExt, pin_mut, select}; + use tokio::io::AsyncReadExt; let mut query = query.clone(); - let mut ftmp1 = Ftmp::new(); - let mut ftmp2 = Ftmp::new(); + let mut fopen = None; + let mut file: Option = None; + let mut file_taken_for_read = false; async_stream::stream! { + let mut reading = None; let mut i1 = 0; loop { - if ftmp1.is_empty() { - let p2 = datapath(&query); - ftmp1.open(p2); - query.timebin += 1; + { + if fopen.is_none() && file.is_none() && !file_taken_for_read { + query.timebin = 18700 + i1; + fopen = Some(Fopen1::new(datapath(&query))); + i1 += 1; + } + } + let blen = query.buffer_size as usize; + if fopen.is_some() { + if file.is_some() { + if reading.is_none() { + let mut buf = bytes::BytesMut::with_capacity(blen); + let mut file2 = file.take().unwrap(); + file_taken_for_read = true; + let a = async move { + file2.read_buf(&mut buf).await?; + Ok::<_, Error>((file2, buf)) + }; + let a = Box::pin(a); + reading = Some(a.fuse()); + } + let bufres = select! { + // TODO can I avoid the unwraps via matching already above? + f = fopen.as_mut().unwrap() => { + fopen = None; + file = Some(f.unwrap()); + info!("opened next file while also waiting on data read"); + None + } + k = reading.as_mut().unwrap() => { + //() == k; + reading = None; + // TODO handle the error somehow here... + let k = k.unwrap(); + file = Some(k.0); + // TODO must be a nicer way to do this: + file_taken_for_read = false; + Some(k.1) + } + }; + if let Some(k) = bufres { + yield Ok(k.freeze()); + } + } + else { + // TODO try to avoid this duplicated code: + select! { + // TODO can I avoid the unwraps via matching already above? + f = fopen.as_mut().unwrap() => { + fopen = None; + file = Some(f.unwrap()); + info!("opened next file"); + } + }; + } + } + else if file.is_some() { + info!("start read file in a loop"); + loop { + let mut buf = bytes::BytesMut::with_capacity(blen); + let mut file2 = file.take().unwrap(); + file_taken_for_read = true; + let n1 = file2.read_buf(&mut buf).await?; + if n1 == 0 { + break; + } + else { + yield Ok(buf.freeze()); + } + } + info!("DONE with file in a loop"); } - let timebin = 18700 + i1; - let b = Box::new(""); - fn lala(h: H) {} - lala(b); - //query.timebin = timebin; - //let s2 = raw_concat_channel_read_stream_timebin(&query); - //pin_mut!(s2); - //while let Some(item) = s2.next().await { - // yield item; - //} - //let s2f = s2.next().fuse(); - //pin_mut!(s2f); - //pin_mut!(f2); - let ff2 = ftmp1.file.take().unwrap(); - pin_mut!(ff2); - //let i: Box> + Send + Unpin> = ff2; - let ff3 = Box::pin(ff2); - pin_mut!(ff3); - //let z = select! { _ = ff3 => () }; - yield Ok(Bytes::new()); } } }