Move case into function
This commit is contained in:
+49
-53
@@ -218,12 +218,54 @@ where
|
|||||||
|
|
||||||
fn poll_open_check_local_file(
|
fn poll_open_check_local_file(
|
||||||
self: &mut Self,
|
self: &mut Self,
|
||||||
_fut: Pin<Box<dyn Future<Output = Result<File, io::Error>> + Send>>,
|
mut fut: Pin<Box<dyn Future<Output = Result<File, io::Error>> + Send>>,
|
||||||
) -> (
|
cx: &mut Context,
|
||||||
Poll<Option<Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>>>,
|
) -> Poll<Option<Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>>> {
|
||||||
Pin<Box<dyn Future<Output = Result<File, io::Error>> + Send>>,
|
use Poll::*;
|
||||||
) {
|
match fut.poll_unpin(cx) {
|
||||||
todo!()
|
Ready(item) => {
|
||||||
|
self.open_check_local_file = None;
|
||||||
|
match item {
|
||||||
|
Ok(file) => {
|
||||||
|
self.read_from_cache = true;
|
||||||
|
let fut =
|
||||||
|
<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output as ReadableFromFile>::read_from_file(file)?;
|
||||||
|
self.read_cache_fut = Some(Box::pin(fut));
|
||||||
|
// Return Ready(None) to signal that nothing is Pending but we need to get polled again.
|
||||||
|
//continue 'outer;
|
||||||
|
Ready(None)
|
||||||
|
}
|
||||||
|
Err(e) => match e.kind() {
|
||||||
|
// TODO other error kinds
|
||||||
|
io::ErrorKind::NotFound => match self.try_setup_fetch_prebinned_higher_res() {
|
||||||
|
Ok(_) => {
|
||||||
|
if self.fut2.is_none() {
|
||||||
|
let e =
|
||||||
|
Err(Error::with_msg(format!("try_setup_fetch_prebinned_higher_res failed")));
|
||||||
|
self.errored = true;
|
||||||
|
Ready(Some(e))
|
||||||
|
} else {
|
||||||
|
//continue 'outer;
|
||||||
|
Ready(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
let e =
|
||||||
|
Error::with_msg(format!("try_setup_fetch_prebinned_higher_res error: {:?}", e));
|
||||||
|
self.errored = true;
|
||||||
|
Ready(Some(Err(e)))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
error!("File I/O error: kind {:?} {:?}\n\n..............", e.kind(), e);
|
||||||
|
self.errored = true;
|
||||||
|
Ready(Some(Err(e.into())))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Pending => Pending,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn _check_for_existing_cached_data(&mut self) -> Result<(), Error> {
|
fn _check_for_existing_cached_data(&mut self) -> Result<(), Error> {
|
||||||
@@ -378,54 +420,8 @@ where
|
|||||||
Pending => Pending,
|
Pending => Pending,
|
||||||
}
|
}
|
||||||
} else if let Some(fut) = self.open_check_local_file.take() {
|
} else if let Some(fut) = self.open_check_local_file.take() {
|
||||||
let (res, fut) = Self::poll_open_check_local_file(&mut self, fut);
|
let res = Self::poll_open_check_local_file(&mut self, fut, cx);
|
||||||
self.open_check_local_file = Some(fut);
|
|
||||||
res
|
res
|
||||||
} else if let Some(fut) = self.open_check_local_file.as_mut() {
|
|
||||||
match fut.poll_unpin(cx) {
|
|
||||||
Ready(item) => {
|
|
||||||
self.open_check_local_file = None;
|
|
||||||
match item {
|
|
||||||
Ok(file) => {
|
|
||||||
self.read_from_cache = true;
|
|
||||||
let fut =
|
|
||||||
<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output as ReadableFromFile>::read_from_file(file)?;
|
|
||||||
self.read_cache_fut = Some(Box::pin(fut));
|
|
||||||
continue 'outer;
|
|
||||||
}
|
|
||||||
Err(e) => match e.kind() {
|
|
||||||
// TODO other error kinds
|
|
||||||
io::ErrorKind::NotFound => match self.try_setup_fetch_prebinned_higher_res() {
|
|
||||||
Ok(_) => {
|
|
||||||
if self.fut2.is_none() {
|
|
||||||
let e = Err(Error::with_msg(format!(
|
|
||||||
"try_setup_fetch_prebinned_higher_res failed"
|
|
||||||
)));
|
|
||||||
self.errored = true;
|
|
||||||
Ready(Some(e))
|
|
||||||
} else {
|
|
||||||
continue 'outer;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
let e = Error::with_msg(format!(
|
|
||||||
"try_setup_fetch_prebinned_higher_res error: {:?}",
|
|
||||||
e
|
|
||||||
));
|
|
||||||
self.errored = true;
|
|
||||||
Ready(Some(Err(e)))
|
|
||||||
}
|
|
||||||
},
|
|
||||||
_ => {
|
|
||||||
error!("File I/O error: kind {:?} {:?}\n\n..............", e.kind(), e);
|
|
||||||
self.errored = true;
|
|
||||||
Ready(Some(Err(e.into())))
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Pending => Pending,
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
let cfd = CacheFileDesc::new(
|
let cfd = CacheFileDesc::new(
|
||||||
self.query.channel().clone(),
|
self.query.channel().clone(),
|
||||||
|
|||||||
Reference in New Issue
Block a user