Can read, but not open next file in background yet
This commit is contained in:
@@ -1,2 +1,8 @@
|
|||||||
[workspace]
|
[workspace]
|
||||||
members = ["retrieval", "httpret", "err", "disk"]
|
members = ["retrieval", "httpret", "err", "disk"]
|
||||||
|
|
||||||
|
[profile.release]
|
||||||
|
opt-level = 0
|
||||||
|
overflow-checks = true
|
||||||
|
debug = 2
|
||||||
|
debug-assertions = true
|
||||||
|
|||||||
@@ -11,5 +11,7 @@ serde_json = "1.0"
|
|||||||
async-channel = "1.6"
|
async-channel = "1.6"
|
||||||
bytes = "1.0.1"
|
bytes = "1.0.1"
|
||||||
futures-core = "0.3.12"
|
futures-core = "0.3.12"
|
||||||
|
futures-util = "0.3.13"
|
||||||
|
async-stream = "0.3.0"
|
||||||
err = { path = "../err" }
|
err = { path = "../err" }
|
||||||
netpod = { path = "../netpod" }
|
netpod = { path = "../netpod" }
|
||||||
|
|||||||
216
disk/src/lib.rs
216
disk/src/lib.rs
@@ -4,11 +4,16 @@ use err::Error;
|
|||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use tokio::io::AsyncRead;
|
use tokio::io::AsyncRead;
|
||||||
//use std::future::Future;
|
use std::future::Future;
|
||||||
//use futures_core::Stream;
|
use futures_core::Stream;
|
||||||
|
use futures_util::future::FusedFuture;
|
||||||
|
use bytes::Bytes;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
pub async fn read_test_1() -> Result<netpod::BodyStream, Error> {
|
pub async fn read_test_1(query: &netpod::AggQuerySingleChannel) -> Result<netpod::BodyStream, Error> {
|
||||||
let path = "/data/sf-databuffer/daq_swissfel/daq_swissfel_3/byTime/S10CB01-RIQM-DCP10:FOR-AMPLT/0000000000000018714/0000000012/0000000000086400000_00000_Data";
|
let pre = "/data/sf-databuffer/daq_swissfel";
|
||||||
|
let path = format!("{}/{}_{}/byTime/{}/{:019}/{:010}/{:019}_00000_Data", pre, query.ksprefix, query.keyspace, query.channel.name(), query.timebin, query.split, query.tbsize);
|
||||||
|
debug!("try path: {}", path);
|
||||||
let fin = tokio::fs::OpenOptions::new()
|
let fin = tokio::fs::OpenOptions::new()
|
||||||
.read(true)
|
.read(true)
|
||||||
.open(path)
|
.open(path)
|
||||||
@@ -19,6 +24,7 @@ pub async fn read_test_1() -> Result<netpod::BodyStream, Error> {
|
|||||||
inner: Box::new(FileReader {
|
inner: Box::new(FileReader {
|
||||||
file: fin,
|
file: fin,
|
||||||
nreads: 0,
|
nreads: 0,
|
||||||
|
buffer_size: query.buffer_size,
|
||||||
}),
|
}),
|
||||||
};
|
};
|
||||||
Ok(stream)
|
Ok(stream)
|
||||||
@@ -27,28 +33,36 @@ pub async fn read_test_1() -> Result<netpod::BodyStream, Error> {
|
|||||||
struct FileReader {
|
struct FileReader {
|
||||||
file: tokio::fs::File,
|
file: tokio::fs::File,
|
||||||
nreads: u32,
|
nreads: u32,
|
||||||
|
buffer_size: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl futures_core::Stream for FileReader {
|
impl futures_core::Stream for FileReader {
|
||||||
type Item = Result<bytes::Bytes, Error>;
|
type Item = Result<bytes::Bytes, Error>;
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
if self.nreads >= 10 {
|
let blen = self.buffer_size as usize;
|
||||||
return Poll::Ready(None);
|
|
||||||
}
|
|
||||||
let blen = 13;
|
|
||||||
let mut buf2 = bytes::BytesMut::with_capacity(blen);
|
let mut buf2 = bytes::BytesMut::with_capacity(blen);
|
||||||
buf2.resize(buf2.capacity(), 0);
|
buf2.resize(buf2.capacity(), 0);
|
||||||
if buf2.as_mut().len() != blen {
|
if buf2.as_mut().len() != blen {
|
||||||
panic!("todo prepare slice");
|
panic!("logic");
|
||||||
}
|
}
|
||||||
let mut buf = tokio::io::ReadBuf::new(buf2.as_mut());
|
let mut buf = tokio::io::ReadBuf::new(buf2.as_mut());
|
||||||
|
if buf.filled().len() != 0 {
|
||||||
|
panic!("logic");
|
||||||
|
}
|
||||||
match Pin::new(&mut self.file).poll_read(cx, &mut buf) {
|
match Pin::new(&mut self.file).poll_read(cx, &mut buf) {
|
||||||
Poll::Ready(Ok(_)) => {
|
Poll::Ready(Ok(_)) => {
|
||||||
info!("read from disk: {} nreads {}", buf.filled().len(), self.nreads);
|
let rlen = buf.filled().len();
|
||||||
info!("buf2 len: {}", buf2.len());
|
if rlen == 0 {
|
||||||
self.nreads += 1;
|
Poll::Ready(None)
|
||||||
Poll::Ready(Some(Ok(buf2.freeze())))
|
}
|
||||||
|
else {
|
||||||
|
if rlen != blen {
|
||||||
|
info!("short read {} of {}", buf.filled().len(), blen);
|
||||||
|
}
|
||||||
|
self.nreads += 1;
|
||||||
|
Poll::Ready(Some(Ok(buf2.freeze())))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Poll::Ready(Err(e)) => {
|
Poll::Ready(Err(e)) => {
|
||||||
Poll::Ready(Some(Err(Error::from(e))))
|
Poll::Ready(Some(Err(Error::from(e))))
|
||||||
@@ -58,3 +72,179 @@ impl futures_core::Stream for FileReader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
struct Ftmp {
|
||||||
|
file: Option<Box<dyn FusedFuture<Output=Result<tokio::fs::File, std::io::Error>> + Send>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Ftmp {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Ftmp {
|
||||||
|
file: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn open(&mut self, path: PathBuf) {
|
||||||
|
//let z = tokio::fs::OpenOptions::new().read(true).open(path);
|
||||||
|
//let y = Box::new(z);
|
||||||
|
use futures_util::FutureExt;
|
||||||
|
self.file.replace(Box::new(tokio::fs::File::open(path).fuse()));
|
||||||
|
}
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> {
|
||||||
|
use futures_util::{StreamExt, FutureExt, pin_mut, select};
|
||||||
|
let mut query = query.clone();
|
||||||
|
async_stream::stream! {
|
||||||
|
let mut i1 = 0;
|
||||||
|
loop {
|
||||||
|
let timebin = 18700 + i1;
|
||||||
|
query.timebin = timebin;
|
||||||
|
let s2 = raw_concat_channel_read_stream_timebin(&query);
|
||||||
|
pin_mut!(s2);
|
||||||
|
while let Some(item) = s2.next().await {
|
||||||
|
yield item;
|
||||||
|
}
|
||||||
|
i1 += 1;
|
||||||
|
if i1 > 15 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> {
|
||||||
|
use futures_util::{StreamExt, FutureExt, pin_mut, select};
|
||||||
|
let mut query = query.clone();
|
||||||
|
let mut ftmp1 = Ftmp::new();
|
||||||
|
let mut ftmp2 = Ftmp::new();
|
||||||
|
async_stream::stream! {
|
||||||
|
let mut i1 = 0;
|
||||||
|
loop {
|
||||||
|
if ftmp1.is_empty() {
|
||||||
|
let p2 = datapath(&query);
|
||||||
|
ftmp1.open(p2);
|
||||||
|
query.timebin += 1;
|
||||||
|
}
|
||||||
|
let timebin = 18700 + i1;
|
||||||
|
//query.timebin = timebin;
|
||||||
|
//let s2 = raw_concat_channel_read_stream_timebin(&query);
|
||||||
|
//pin_mut!(s2);
|
||||||
|
//while let Some(item) = s2.next().await {
|
||||||
|
// yield item;
|
||||||
|
//}
|
||||||
|
//let s2f = s2.next().fuse();
|
||||||
|
//pin_mut!(s2f);
|
||||||
|
//pin_mut!(f2);
|
||||||
|
//let ff2 = ftmp1.file.unwrap();
|
||||||
|
//() == ff2;
|
||||||
|
//pin_mut!(ff2);
|
||||||
|
//let z = select! {
|
||||||
|
// _ = ff2 => (),
|
||||||
|
//};
|
||||||
|
yield Ok(Bytes::new());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn datapath(query: &netpod::AggQuerySingleChannel) -> PathBuf {
|
||||||
|
let pre = "/data/sf-databuffer/daq_swissfel";
|
||||||
|
let path = format!("{}/{}_{}/byTime/{}/{:019}/{:010}/{:019}_00000_Data", pre, query.ksprefix, query.keyspace, query.channel.name(), query.timebin, query.split, query.tbsize);
|
||||||
|
path.into()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> {
|
||||||
|
let query = query.clone();
|
||||||
|
let pre = "/data/sf-databuffer/daq_swissfel";
|
||||||
|
let path = format!("{}/{}_{}/byTime/{}/{:019}/{:010}/{:019}_00000_Data", pre, query.ksprefix, query.keyspace, query.channel.name(), query.timebin, query.split, query.tbsize);
|
||||||
|
async_stream::stream! {
|
||||||
|
debug!("try path: {}", path);
|
||||||
|
let mut fin = tokio::fs::OpenOptions::new()
|
||||||
|
.read(true)
|
||||||
|
.open(path)
|
||||||
|
.await?;
|
||||||
|
let meta = fin.metadata().await?;
|
||||||
|
debug!("file meta {:?}", meta);
|
||||||
|
let blen = query.buffer_size as usize;
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
loop {
|
||||||
|
let mut buf = bytes::BytesMut::with_capacity(blen);
|
||||||
|
assert!(buf.is_empty());
|
||||||
|
if false {
|
||||||
|
buf.resize(buf.capacity(), 0);
|
||||||
|
if buf.as_mut().len() != blen {
|
||||||
|
panic!("logic");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let n1 = fin.read_buf(&mut buf).await?;
|
||||||
|
if n1 == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
yield Ok(buf.freeze());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub async fn raw_concat_channel_read(query: &netpod::AggQuerySingleChannel) -> Result<netpod::BodyStream, Error> {
|
||||||
|
let _reader = RawConcatChannelReader {
|
||||||
|
ksprefix: query.ksprefix.clone(),
|
||||||
|
keyspace: query.keyspace,
|
||||||
|
channel: query.channel.clone(),
|
||||||
|
split: query.split,
|
||||||
|
tbsize: query.tbsize,
|
||||||
|
buffer_size: query.buffer_size,
|
||||||
|
tb: 18714,
|
||||||
|
//file_reader: None,
|
||||||
|
fopen: None,
|
||||||
|
};
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
Read all events from all timebins for the given channel and split.
|
||||||
|
*/
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub struct RawConcatChannelReader {
|
||||||
|
ksprefix: String,
|
||||||
|
keyspace: u32,
|
||||||
|
channel: netpod::Channel,
|
||||||
|
split: u32,
|
||||||
|
tbsize: u32,
|
||||||
|
buffer_size: u32,
|
||||||
|
tb: u32,
|
||||||
|
//file_reader: Option<FileReader>,
|
||||||
|
|
||||||
|
// TODO
|
||||||
|
// Not enough to store a simple future here.
|
||||||
|
// That will only resolve to a single output.
|
||||||
|
// • How can I transition between Stream and async world?
|
||||||
|
// • I guess I must not poll a completed Future which comes from some async fn again after it completed.
|
||||||
|
// • relevant crates: async-stream, tokio-stream
|
||||||
|
fopen: Option<Box<dyn Future<Output=Option<Result<Bytes, Error>>> + Send>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RawConcatChannelReader {
|
||||||
|
|
||||||
|
pub fn read(self) -> Result<netpod::BodyStream, Error> {
|
||||||
|
let res = netpod::BodyStream {
|
||||||
|
inner: Box::new(self),
|
||||||
|
};
|
||||||
|
Ok(res)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
impl futures_core::Stream for RawConcatChannelReader {
|
||||||
|
type Item = Result<Bytes, Error>;
|
||||||
|
|
||||||
|
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|||||||
@@ -62,9 +62,10 @@ async fn parsed_raw(req: Request<Body>) -> Result<Response<Body>, Error> {
|
|||||||
use netpod::AggQuerySingleChannel;
|
use netpod::AggQuerySingleChannel;
|
||||||
let reqbody = req.into_body();
|
let reqbody = req.into_body();
|
||||||
let bodyslice = hyper::body::to_bytes(reqbody).await?;
|
let bodyslice = hyper::body::to_bytes(reqbody).await?;
|
||||||
let _query: AggQuerySingleChannel = serde_json::from_slice(&bodyslice)?;
|
let query: AggQuerySingleChannel = serde_json::from_slice(&bodyslice)?;
|
||||||
let q = disk::read_test_1().await?;
|
//let q = disk::read_test_1(&query).await?;
|
||||||
let s = q.inner;
|
//let s = q.inner;
|
||||||
|
let s = disk::raw_concat_channel_read_stream(&query);
|
||||||
let res = response(StatusCode::OK)
|
let res = response(StatusCode::OK)
|
||||||
.body(Body::wrap_stream(s))?;
|
.body(Body::wrap_stream(s))?;
|
||||||
/*
|
/*
|
||||||
|
|||||||
@@ -5,8 +5,8 @@ authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
|||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
serde = "1.0"
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_derive = "1.0"
|
#serde_derive = "1.0"
|
||||||
async-channel = "1.6"
|
async-channel = "1.6"
|
||||||
bytes = "1.0.1"
|
bytes = "1.0.1"
|
||||||
futures-core = "0.3.12"
|
futures-core = "0.3.12"
|
||||||
|
|||||||
@@ -1,10 +1,36 @@
|
|||||||
use serde_derive::{Serialize, Deserialize};
|
use serde::{Serialize, Deserialize};
|
||||||
//use std::pin::Pin;
|
|
||||||
use err::Error;
|
use err::Error;
|
||||||
|
//use std::pin::Pin;
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct Channel {
|
||||||
|
pub backend: String,
|
||||||
|
pub name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Channel {
|
||||||
|
pub fn name(&self) -> &str {
|
||||||
|
&self.name
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn serde_channel() {
|
||||||
|
let _ex = "{\"name\":\"thechannel\",\"backend\":\"thebackend\"}";
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct AggQuerySingleChannel {
|
pub struct AggQuerySingleChannel {
|
||||||
pub channel: String,
|
pub ksprefix: String,
|
||||||
|
pub keyspace: u32,
|
||||||
|
pub channel: Channel,
|
||||||
|
pub timebin: u32,
|
||||||
|
pub split: u32,
|
||||||
|
pub tbsize: u32,
|
||||||
|
pub buffer_size: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct BodyStream {
|
pub struct BodyStream {
|
||||||
|
|||||||
@@ -1,8 +1,6 @@
|
|||||||
#[allow(unused_imports)]
|
#[allow(unused_imports)]
|
||||||
use tracing::{error, warn, info, debug, trace};
|
use tracing::{error, warn, info, debug, trace};
|
||||||
use err::Error;
|
use err::Error;
|
||||||
use http::Method;
|
|
||||||
use hyper::Body;
|
|
||||||
|
|
||||||
pub fn main() {
|
pub fn main() {
|
||||||
match run(go()) {
|
match run(go()) {
|
||||||
@@ -48,20 +46,30 @@ pub fn tracing_init() {
|
|||||||
.with_target(true)
|
.with_target(true)
|
||||||
.with_thread_names(true)
|
.with_thread_names(true)
|
||||||
//.with_max_level(tracing::Level::INFO)
|
//.with_max_level(tracing::Level::INFO)
|
||||||
.with_env_filter(tracing_subscriber::EnvFilter::new("info,retrieval=trace,tokio_postgres=info"))
|
.with_env_filter(tracing_subscriber::EnvFilter::new("info,retrieval=trace,disk=trace,tokio_postgres=info"))
|
||||||
.init();
|
.init();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn simple_fetch() {
|
fn simple_fetch() {
|
||||||
run(async {
|
run(async {
|
||||||
|
let t1 = chrono::Utc::now();
|
||||||
let query = netpod::AggQuerySingleChannel {
|
let query = netpod::AggQuerySingleChannel {
|
||||||
channel: "S10CB01-RIQM-STA:DACI".into(),
|
ksprefix: "daq_swissfel".into(),
|
||||||
|
keyspace: 2,
|
||||||
|
channel: netpod::Channel {
|
||||||
|
name: "S10BC01-DBAM070:EOM1_T1".into(),
|
||||||
|
backend: "sf-databuffer".into(),
|
||||||
|
},
|
||||||
|
timebin: 18714,
|
||||||
|
split: 12,
|
||||||
|
tbsize: 1000 * 60 * 60 * 24,
|
||||||
|
buffer_size: 1024 * 4,
|
||||||
};
|
};
|
||||||
let query_string = serde_json::to_string(&query).unwrap();
|
let query_string = serde_json::to_string(&query).unwrap();
|
||||||
let host = tokio::spawn(httpret::host(8360));
|
let _host = tokio::spawn(httpret::host(8360));
|
||||||
let req = hyper::Request::builder()
|
let req = hyper::Request::builder()
|
||||||
.method(Method::POST)
|
.method(http::Method::POST)
|
||||||
.uri("http://localhost:8360/api/1/parsed_raw")
|
.uri("http://localhost:8360/api/1/parsed_raw")
|
||||||
.body(query_string.into()).unwrap();
|
.body(query_string.into()).unwrap();
|
||||||
let client = hyper::Client::new();
|
let client = hyper::Client::new();
|
||||||
@@ -69,10 +77,12 @@ fn simple_fetch() {
|
|||||||
info!("client response {:?}", res);
|
info!("client response {:?}", res);
|
||||||
let mut res_body = res.into_body();
|
let mut res_body = res.into_body();
|
||||||
use hyper::body::HttpBody;
|
use hyper::body::HttpBody;
|
||||||
|
let mut ntot = 0 as u64;
|
||||||
loop {
|
loop {
|
||||||
match res_body.data().await {
|
match res_body.data().await {
|
||||||
Some(Ok(k)) => {
|
Some(Ok(k)) => {
|
||||||
info!("packet.. len {}", k.len());
|
//info!("packet.. len {}", k.len());
|
||||||
|
ntot += k.len() as u64;
|
||||||
}
|
}
|
||||||
Some(Err(e)) => {
|
Some(Err(e)) => {
|
||||||
error!("{:?}", e);
|
error!("{:?}", e);
|
||||||
@@ -83,6 +93,10 @@ fn simple_fetch() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
let t2 = chrono::Utc::now();
|
||||||
|
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
|
||||||
|
let throughput = ntot / 1024 * 1000 / ms;
|
||||||
|
info!("total download bytes {} throughput {:5} kB/s", ntot, throughput);
|
||||||
//Err::<(), _>(format!("test error").into())
|
//Err::<(), _>(format!("test error").into())
|
||||||
Ok(())
|
Ok(())
|
||||||
}).unwrap();
|
}).unwrap();
|
||||||
|
|||||||
Reference in New Issue
Block a user