From be617258b293cb038bcdc506e290ed515a07133e Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sat, 3 Apr 2021 10:28:34 +0200 Subject: [PATCH] up --- Cargo.toml | 8 ++--- disk/src/lib.rs | 57 +++++++++++++++++++++++++++++++++- err/Cargo.toml | 1 + err/src/lib.rs | 8 +++++ httpret/src/lib.rs | 2 +- retrieval/src/bin/retrieval.rs | 4 +-- 6 files changed, 72 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ef35ea0..34bc19e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ members = ["retrieval", "httpret", "err", "disk"] [profile.release] -opt-level = 0 -overflow-checks = true -debug = 2 -debug-assertions = true +#opt-level = 0 +#overflow-checks = true +#debug = 2 +#debug-assertions = true diff --git a/disk/src/lib.rs b/disk/src/lib.rs index aabbb9c..6c19158 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -8,7 +8,7 @@ use tokio::fs::File; use std::future::Future; use futures_core::Stream; use futures_util::future::FusedFuture; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use std::path::PathBuf; pub async fn read_test_1(query: &netpod::AggQuerySingleChannel) -> Result { @@ -262,6 +262,61 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg // TODO implement another variant with a dedicated task to feed the opened file queue. +pub fn raw_concat_channel_read_stream_file_pipe(query: &netpod::AggQuerySingleChannel) -> impl Stream> + Send { + let query = query.clone(); + async_stream::stream! { + let chrx = open_files(&query); + while let Ok(file) = chrx.recv().await { + let mut file = match file { + Ok(k) => k, + Err(_) => break + }; + loop { + let mut buf = BytesMut::with_capacity(query.buffer_size as usize); + use tokio::io::AsyncReadExt; + let n1 = file.read_buf(&mut buf).await?; + if n1 == 0 { + info!("file EOF"); + break; + } + else { + yield Ok(buf.freeze()); + } + } + } + } +} + +fn open_files(query: &netpod::AggQuerySingleChannel) -> async_channel::Receiver> { + let (chtx, chrx) = async_channel::bounded(2); + let mut query = query.clone(); + tokio::spawn(async move { + let tb0 = query.timebin; + for i1 in 0..16 { + query.timebin = tb0 + i1; + let path = datapath(&query); + let fileres = tokio::fs::OpenOptions::new() + .read(true) + .open(path) + .await; + match fileres { + Ok(k) => { + match chtx.send(Ok(k)).await { + Ok(_) => (), + Err(_) => break + } + } + Err(e) => { + match chtx.send(Err(e.into())).await { + Ok(_) => (), + Err(_) => break + } + } + } + } + }); + chrx +} pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> impl Stream> + Send { diff --git a/err/Cargo.toml b/err/Cargo.toml index 0dad62d..80c9c6e 100644 --- a/err/Cargo.toml +++ b/err/Cargo.toml @@ -8,3 +8,4 @@ edition = "2018" hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp"] } http = "0.2" serde_json = "1.0" +async-channel = "1.6" diff --git a/err/src/lib.rs b/err/src/lib.rs index 3f10e53..d4056f9 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -59,3 +59,11 @@ impl From for Error { } } } + +impl From for Error { + fn from (k: async_channel::RecvError) -> Self { + Self { + msg: k.to_string(), + } + } +} diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 6d7f77f..732a9c6 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -65,7 +65,7 @@ async fn parsed_raw(req: Request) -> Result, Error> { let query: AggQuerySingleChannel = serde_json::from_slice(&bodyslice)?; //let q = disk::read_test_1(&query).await?; //let s = q.inner; - let s = disk::raw_concat_channel_read_stream_try_open_in_background(&query); + let s = disk::raw_concat_channel_read_stream_file_pipe(&query); let res = response(StatusCode::OK) .body(Body::wrap_stream(s))?; /* diff --git a/retrieval/src/bin/retrieval.rs b/retrieval/src/bin/retrieval.rs index 191ed69..23faeff 100644 --- a/retrieval/src/bin/retrieval.rs +++ b/retrieval/src/bin/retrieval.rs @@ -61,10 +61,10 @@ fn simple_fetch() { name: "S10BC01-DBAM070:EOM1_T1".into(), backend: "sf-databuffer".into(), }, - timebin: 18714, + timebin: 18700, split: 12, tbsize: 1000 * 60 * 60 * 24, - buffer_size: 1024 * 4, + buffer_size: 1024 * 16, }; let query_string = serde_json::to_string(&query).unwrap(); let _host = tokio::spawn(httpret::host(8360));