WIP on read
This commit is contained in:
+121
-28
@@ -4,6 +4,7 @@ use err::Error;
|
|||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use tokio::io::AsyncRead;
|
use tokio::io::AsyncRead;
|
||||||
|
use tokio::fs::File;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use futures_core::Stream;
|
use futures_core::Stream;
|
||||||
use futures_util::future::FusedFuture;
|
use futures_util::future::FusedFuture;
|
||||||
@@ -87,12 +88,12 @@ impl Ftmp {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn open(&mut self, path: PathBuf) {
|
pub fn open(&mut self, path: PathBuf) {
|
||||||
/*{
|
/*let a1 = async {
|
||||||
let ff = tokio::fs::File::open(path.clone()).fuse();
|
let ff = tokio::fs::File::open(path.clone()).fuse();
|
||||||
futures_util::pin_mut!(ff);
|
futures_util::pin_mut!(ff);
|
||||||
let u: Box<dyn FusedFuture<Output=Result<tokio::fs::File, std::io::Error>> + Send + Unpin> = Box::new(ff);
|
let u: Box<dyn FusedFuture<Output=Result<tokio::fs::File, std::io::Error>> + Send + Unpin> = Box::new(ff);
|
||||||
self.file2.replace(u);
|
self.file2.replace(u);
|
||||||
}*/
|
};*/
|
||||||
//let z = tokio::fs::OpenOptions::new().read(true).open(path);
|
//let z = tokio::fs::OpenOptions::new().read(true).open(path);
|
||||||
//let y = Box::new(z);
|
//let y = Box::new(z);
|
||||||
use futures_util::FutureExt;
|
use futures_util::FutureExt;
|
||||||
@@ -104,6 +105,48 @@ impl Ftmp {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
struct Fopen1 {
|
||||||
|
opts: tokio::fs::OpenOptions,
|
||||||
|
fut: Box<dyn Future<Output=Result<tokio::fs::File, std::io::Error>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Fopen1 {
|
||||||
|
|
||||||
|
pub fn new(path: PathBuf) -> Self {
|
||||||
|
let fut: Box<dyn Future<Output=std::io::Result<tokio::fs::File>>> = Box::new(async {
|
||||||
|
let mut o1 = tokio::fs::OpenOptions::new();
|
||||||
|
let o2 = o1.read(true);
|
||||||
|
let res = o2.open(path);
|
||||||
|
//() == res;
|
||||||
|
//todo!()
|
||||||
|
res.await
|
||||||
|
}) as Box<dyn Future<Output=std::io::Result<tokio::fs::File>>>;
|
||||||
|
let fut2: Box<dyn Future<Output=u32>> = Box::new(async {
|
||||||
|
123
|
||||||
|
});
|
||||||
|
Self {
|
||||||
|
opts: tokio::fs::OpenOptions::new(),
|
||||||
|
fut,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Future for Fopen1 {
|
||||||
|
type Output = Result<tokio::fs::File, Error>;
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FusedFuture for Fopen1 {
|
||||||
|
fn is_terminated(&self) -> bool {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> {
|
pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> {
|
||||||
use futures_util::{StreamExt, FutureExt, pin_mut, select};
|
use futures_util::{StreamExt, FutureExt, pin_mut, select};
|
||||||
let mut query = query.clone();
|
let mut query = query.clone();
|
||||||
@@ -128,37 +171,87 @@ pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) ->
|
|||||||
|
|
||||||
pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> {
|
pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> {
|
||||||
use futures_util::{StreamExt, FutureExt, pin_mut, select};
|
use futures_util::{StreamExt, FutureExt, pin_mut, select};
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
let mut query = query.clone();
|
let mut query = query.clone();
|
||||||
let mut ftmp1 = Ftmp::new();
|
let mut fopen = None;
|
||||||
let mut ftmp2 = Ftmp::new();
|
let mut file: Option<File> = None;
|
||||||
|
let mut file_taken_for_read = false;
|
||||||
async_stream::stream! {
|
async_stream::stream! {
|
||||||
|
let mut reading = None;
|
||||||
let mut i1 = 0;
|
let mut i1 = 0;
|
||||||
loop {
|
loop {
|
||||||
if ftmp1.is_empty() {
|
{
|
||||||
let p2 = datapath(&query);
|
if fopen.is_none() && file.is_none() && !file_taken_for_read {
|
||||||
ftmp1.open(p2);
|
query.timebin = 18700 + i1;
|
||||||
query.timebin += 1;
|
fopen = Some(Fopen1::new(datapath(&query)));
|
||||||
|
i1 += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let blen = query.buffer_size as usize;
|
||||||
|
if fopen.is_some() {
|
||||||
|
if file.is_some() {
|
||||||
|
if reading.is_none() {
|
||||||
|
let mut buf = bytes::BytesMut::with_capacity(blen);
|
||||||
|
let mut file2 = file.take().unwrap();
|
||||||
|
file_taken_for_read = true;
|
||||||
|
let a = async move {
|
||||||
|
file2.read_buf(&mut buf).await?;
|
||||||
|
Ok::<_, Error>((file2, buf))
|
||||||
|
};
|
||||||
|
let a = Box::pin(a);
|
||||||
|
reading = Some(a.fuse());
|
||||||
|
}
|
||||||
|
let bufres = select! {
|
||||||
|
// TODO can I avoid the unwraps via matching already above?
|
||||||
|
f = fopen.as_mut().unwrap() => {
|
||||||
|
fopen = None;
|
||||||
|
file = Some(f.unwrap());
|
||||||
|
info!("opened next file while also waiting on data read");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
k = reading.as_mut().unwrap() => {
|
||||||
|
//() == k;
|
||||||
|
reading = None;
|
||||||
|
// TODO handle the error somehow here...
|
||||||
|
let k = k.unwrap();
|
||||||
|
file = Some(k.0);
|
||||||
|
// TODO must be a nicer way to do this:
|
||||||
|
file_taken_for_read = false;
|
||||||
|
Some(k.1)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Some(k) = bufres {
|
||||||
|
yield Ok(k.freeze());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// TODO try to avoid this duplicated code:
|
||||||
|
select! {
|
||||||
|
// TODO can I avoid the unwraps via matching already above?
|
||||||
|
f = fopen.as_mut().unwrap() => {
|
||||||
|
fopen = None;
|
||||||
|
file = Some(f.unwrap());
|
||||||
|
info!("opened next file");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if file.is_some() {
|
||||||
|
info!("start read file in a loop");
|
||||||
|
loop {
|
||||||
|
let mut buf = bytes::BytesMut::with_capacity(blen);
|
||||||
|
let mut file2 = file.take().unwrap();
|
||||||
|
file_taken_for_read = true;
|
||||||
|
let n1 = file2.read_buf(&mut buf).await?;
|
||||||
|
if n1 == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
yield Ok(buf.freeze());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
info!("DONE with file in a loop");
|
||||||
}
|
}
|
||||||
let timebin = 18700 + i1;
|
|
||||||
let b = Box::new("");
|
|
||||||
fn lala<H: Unpin>(h: H) {}
|
|
||||||
lala(b);
|
|
||||||
//query.timebin = timebin;
|
|
||||||
//let s2 = raw_concat_channel_read_stream_timebin(&query);
|
|
||||||
//pin_mut!(s2);
|
|
||||||
//while let Some(item) = s2.next().await {
|
|
||||||
// yield item;
|
|
||||||
//}
|
|
||||||
//let s2f = s2.next().fuse();
|
|
||||||
//pin_mut!(s2f);
|
|
||||||
//pin_mut!(f2);
|
|
||||||
let ff2 = ftmp1.file.take().unwrap();
|
|
||||||
pin_mut!(ff2);
|
|
||||||
//let i: Box<dyn FusedFuture<Output=Result<tokio::fs::File, std::io::Error>> + Send + Unpin> = ff2;
|
|
||||||
let ff3 = Box::pin(ff2);
|
|
||||||
pin_mut!(ff3);
|
|
||||||
//let z = select! { _ = ff3 => () };
|
|
||||||
yield Ok(Bytes::new());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user