This commit is contained in:
Dominik Werder
2021-04-03 10:28:34 +02:00
parent 66f3e2d2c7
commit be617258b2
6 changed files with 72 additions and 8 deletions

View File

@@ -2,7 +2,7 @@
members = ["retrieval", "httpret", "err", "disk"]
[profile.release]
opt-level = 0
overflow-checks = true
debug = 2
debug-assertions = true
#opt-level = 0
#overflow-checks = true
#debug = 2
#debug-assertions = true

View File

@@ -8,7 +8,7 @@ use tokio::fs::File;
use std::future::Future;
use futures_core::Stream;
use futures_util::future::FusedFuture;
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use std::path::PathBuf;
pub async fn read_test_1(query: &netpod::AggQuerySingleChannel) -> Result<netpod::BodyStream, Error> {
@@ -262,6 +262,61 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
// TODO implement another variant with a dedicated task to feed the opened file queue.
pub fn raw_concat_channel_read_stream_file_pipe(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> + Send {
let query = query.clone();
async_stream::stream! {
let chrx = open_files(&query);
while let Ok(file) = chrx.recv().await {
let mut file = match file {
Ok(k) => k,
Err(_) => break
};
loop {
let mut buf = BytesMut::with_capacity(query.buffer_size as usize);
use tokio::io::AsyncReadExt;
let n1 = file.read_buf(&mut buf).await?;
if n1 == 0 {
info!("file EOF");
break;
}
else {
yield Ok(buf.freeze());
}
}
}
}
}
fn open_files(query: &netpod::AggQuerySingleChannel) -> async_channel::Receiver<Result<tokio::fs::File, Error>> {
let (chtx, chrx) = async_channel::bounded(2);
let mut query = query.clone();
tokio::spawn(async move {
let tb0 = query.timebin;
for i1 in 0..16 {
query.timebin = tb0 + i1;
let path = datapath(&query);
let fileres = tokio::fs::OpenOptions::new()
.read(true)
.open(path)
.await;
match fileres {
Ok(k) => {
match chtx.send(Ok(k)).await {
Ok(_) => (),
Err(_) => break
}
}
Err(e) => {
match chtx.send(Err(e.into())).await {
Ok(_) => (),
Err(_) => break
}
}
}
}
});
chrx
}
pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> + Send {

View File

@@ -8,3 +8,4 @@ edition = "2018"
hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp"] }
http = "0.2"
serde_json = "1.0"
async-channel = "1.6"

View File

@@ -59,3 +59,11 @@ impl From<serde_json::Error> for Error {
}
}
}
impl From<async_channel::RecvError> for Error {
fn from (k: async_channel::RecvError) -> Self {
Self {
msg: k.to_string(),
}
}
}

View File

@@ -65,7 +65,7 @@ async fn parsed_raw(req: Request<Body>) -> Result<Response<Body>, Error> {
let query: AggQuerySingleChannel = serde_json::from_slice(&bodyslice)?;
//let q = disk::read_test_1(&query).await?;
//let s = q.inner;
let s = disk::raw_concat_channel_read_stream_try_open_in_background(&query);
let s = disk::raw_concat_channel_read_stream_file_pipe(&query);
let res = response(StatusCode::OK)
.body(Body::wrap_stream(s))?;
/*

View File

@@ -61,10 +61,10 @@ fn simple_fetch() {
name: "S10BC01-DBAM070:EOM1_T1".into(),
backend: "sf-databuffer".into(),
},
timebin: 18714,
timebin: 18700,
split: 12,
tbsize: 1000 * 60 * 60 * 24,
buffer_size: 1024 * 4,
buffer_size: 1024 * 16,
};
let query_string = serde_json::to_string(&query).unwrap();
let _host = tokio::spawn(httpret::host(8360));