From 40ae0bbe4697db4f35fa735ee13e87d86add60c5 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 1 Apr 2021 18:29:31 +0200 Subject: [PATCH] Can read, but not open next file in background yet --- Cargo.toml | 6 + disk/Cargo.toml | 2 + disk/src/lib.rs | 216 +++++++++++++++++++++++++++++++-- httpret/src/lib.rs | 7 +- netpod/Cargo.toml | 4 +- netpod/src/lib.rs | 34 +++++- retrieval/src/bin/retrieval.rs | 28 +++-- 7 files changed, 268 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bc74d0d..ef35ea0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,2 +1,8 @@ [workspace] members = ["retrieval", "httpret", "err", "disk"] + +[profile.release] +opt-level = 0 +overflow-checks = true +debug = 2 +debug-assertions = true diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 2cbd940..058b2e8 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -11,5 +11,7 @@ serde_json = "1.0" async-channel = "1.6" bytes = "1.0.1" futures-core = "0.3.12" +futures-util = "0.3.13" +async-stream = "0.3.0" err = { path = "../err" } netpod = { path = "../netpod" } diff --git a/disk/src/lib.rs b/disk/src/lib.rs index ccfb85a..cee1277 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -4,11 +4,16 @@ use err::Error; use std::task::{Context, Poll}; use std::pin::Pin; use tokio::io::AsyncRead; -//use std::future::Future; -//use futures_core::Stream; +use std::future::Future; +use futures_core::Stream; +use futures_util::future::FusedFuture; +use bytes::Bytes; +use std::path::PathBuf; -pub async fn read_test_1() -> Result { - let path = "/data/sf-databuffer/daq_swissfel/daq_swissfel_3/byTime/S10CB01-RIQM-DCP10:FOR-AMPLT/0000000000000018714/0000000012/0000000000086400000_00000_Data"; +pub async fn read_test_1(query: &netpod::AggQuerySingleChannel) -> Result { + let pre = "/data/sf-databuffer/daq_swissfel"; + let path = format!("{}/{}_{}/byTime/{}/{:019}/{:010}/{:019}_00000_Data", pre, query.ksprefix, query.keyspace, query.channel.name(), query.timebin, query.split, query.tbsize); + debug!("try path: {}", path); let fin = tokio::fs::OpenOptions::new() .read(true) .open(path) @@ -19,6 +24,7 @@ pub async fn read_test_1() -> Result { inner: Box::new(FileReader { file: fin, nreads: 0, + buffer_size: query.buffer_size, }), }; Ok(stream) @@ -27,28 +33,36 @@ pub async fn read_test_1() -> Result { struct FileReader { file: tokio::fs::File, nreads: u32, + buffer_size: u32, } impl futures_core::Stream for FileReader { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.nreads >= 10 { - return Poll::Ready(None); - } - let blen = 13; + let blen = self.buffer_size as usize; let mut buf2 = bytes::BytesMut::with_capacity(blen); buf2.resize(buf2.capacity(), 0); if buf2.as_mut().len() != blen { - panic!("todo prepare slice"); + panic!("logic"); } let mut buf = tokio::io::ReadBuf::new(buf2.as_mut()); + if buf.filled().len() != 0 { + panic!("logic"); + } match Pin::new(&mut self.file).poll_read(cx, &mut buf) { Poll::Ready(Ok(_)) => { - info!("read from disk: {} nreads {}", buf.filled().len(), self.nreads); - info!("buf2 len: {}", buf2.len()); - self.nreads += 1; - Poll::Ready(Some(Ok(buf2.freeze()))) + let rlen = buf.filled().len(); + if rlen == 0 { + Poll::Ready(None) + } + else { + if rlen != blen { + info!("short read {} of {}", buf.filled().len(), blen); + } + self.nreads += 1; + Poll::Ready(Some(Ok(buf2.freeze()))) + } } Poll::Ready(Err(e)) => { Poll::Ready(Some(Err(Error::from(e)))) @@ -58,3 +72,179 @@ impl futures_core::Stream for FileReader { } } + + +struct Ftmp { + file: Option> + Send>>, +} + +impl Ftmp { + pub fn new() -> Self { + Ftmp { + file: None, + } + } + pub fn open(&mut self, path: PathBuf) { + //let z = tokio::fs::OpenOptions::new().read(true).open(path); + //let y = Box::new(z); + use futures_util::FutureExt; + self.file.replace(Box::new(tokio::fs::File::open(path).fuse())); + } + pub fn is_empty(&self) -> bool { + todo!() + } +} + + +pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> impl Stream> { + use futures_util::{StreamExt, FutureExt, pin_mut, select}; + let mut query = query.clone(); + async_stream::stream! { + let mut i1 = 0; + loop { + let timebin = 18700 + i1; + query.timebin = timebin; + let s2 = raw_concat_channel_read_stream_timebin(&query); + pin_mut!(s2); + while let Some(item) = s2.next().await { + yield item; + } + i1 += 1; + if i1 > 15 { + break; + } + } + } +} + + +pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel) -> impl Stream> { + use futures_util::{StreamExt, FutureExt, pin_mut, select}; + let mut query = query.clone(); + let mut ftmp1 = Ftmp::new(); + let mut ftmp2 = Ftmp::new(); + async_stream::stream! { + let mut i1 = 0; + loop { + if ftmp1.is_empty() { + let p2 = datapath(&query); + ftmp1.open(p2); + query.timebin += 1; + } + let timebin = 18700 + i1; + //query.timebin = timebin; + //let s2 = raw_concat_channel_read_stream_timebin(&query); + //pin_mut!(s2); + //while let Some(item) = s2.next().await { + // yield item; + //} + //let s2f = s2.next().fuse(); + //pin_mut!(s2f); + //pin_mut!(f2); + //let ff2 = ftmp1.file.unwrap(); + //() == ff2; + //pin_mut!(ff2); + //let z = select! { + // _ = ff2 => (), + //}; + yield Ok(Bytes::new()); + } + } +} + +fn datapath(query: &netpod::AggQuerySingleChannel) -> PathBuf { + let pre = "/data/sf-databuffer/daq_swissfel"; + let path = format!("{}/{}_{}/byTime/{}/{:019}/{:010}/{:019}_00000_Data", pre, query.ksprefix, query.keyspace, query.channel.name(), query.timebin, query.split, query.tbsize); + path.into() +} + +pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChannel) -> impl Stream> { + let query = query.clone(); + let pre = "/data/sf-databuffer/daq_swissfel"; + let path = format!("{}/{}_{}/byTime/{}/{:019}/{:010}/{:019}_00000_Data", pre, query.ksprefix, query.keyspace, query.channel.name(), query.timebin, query.split, query.tbsize); + async_stream::stream! { + debug!("try path: {}", path); + let mut fin = tokio::fs::OpenOptions::new() + .read(true) + .open(path) + .await?; + let meta = fin.metadata().await?; + debug!("file meta {:?}", meta); + let blen = query.buffer_size as usize; + use tokio::io::AsyncReadExt; + loop { + let mut buf = bytes::BytesMut::with_capacity(blen); + assert!(buf.is_empty()); + if false { + buf.resize(buf.capacity(), 0); + if buf.as_mut().len() != blen { + panic!("logic"); + } + } + let n1 = fin.read_buf(&mut buf).await?; + if n1 == 0 { + break; + } + yield Ok(buf.freeze()); + } + } +} + + +pub async fn raw_concat_channel_read(query: &netpod::AggQuerySingleChannel) -> Result { + let _reader = RawConcatChannelReader { + ksprefix: query.ksprefix.clone(), + keyspace: query.keyspace, + channel: query.channel.clone(), + split: query.split, + tbsize: query.tbsize, + buffer_size: query.buffer_size, + tb: 18714, + //file_reader: None, + fopen: None, + }; + todo!() +} + +/** +Read all events from all timebins for the given channel and split. +*/ +#[allow(dead_code)] +pub struct RawConcatChannelReader { + ksprefix: String, + keyspace: u32, + channel: netpod::Channel, + split: u32, + tbsize: u32, + buffer_size: u32, + tb: u32, + //file_reader: Option, + + // TODO + // Not enough to store a simple future here. + // That will only resolve to a single output. + // • How can I transition between Stream and async world? + // • I guess I must not poll a completed Future which comes from some async fn again after it completed. + // • relevant crates: async-stream, tokio-stream + fopen: Option>> + Send>>, +} + +impl RawConcatChannelReader { + + pub fn read(self) -> Result { + let res = netpod::BodyStream { + inner: Box::new(self), + }; + Ok(res) + } + +} + +impl futures_core::Stream for RawConcatChannelReader { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + todo!() + } + +} diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 28e0a58..08ab382 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -62,9 +62,10 @@ async fn parsed_raw(req: Request) -> Result, Error> { use netpod::AggQuerySingleChannel; let reqbody = req.into_body(); let bodyslice = hyper::body::to_bytes(reqbody).await?; - let _query: AggQuerySingleChannel = serde_json::from_slice(&bodyslice)?; - let q = disk::read_test_1().await?; - let s = q.inner; + let query: AggQuerySingleChannel = serde_json::from_slice(&bodyslice)?; + //let q = disk::read_test_1(&query).await?; + //let s = q.inner; + let s = disk::raw_concat_channel_read_stream(&query); let res = response(StatusCode::OK) .body(Body::wrap_stream(s))?; /* diff --git a/netpod/Cargo.toml b/netpod/Cargo.toml index 95d13ba..cc4b130 100644 --- a/netpod/Cargo.toml +++ b/netpod/Cargo.toml @@ -5,8 +5,8 @@ authors = ["Dominik Werder "] edition = "2018" [dependencies] -serde = "1.0" -serde_derive = "1.0" +serde = { version = "1.0", features = ["derive"] } +#serde_derive = "1.0" async-channel = "1.6" bytes = "1.0.1" futures-core = "0.3.12" diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index c66f6d0..58f5700 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -1,10 +1,36 @@ -use serde_derive::{Serialize, Deserialize}; -//use std::pin::Pin; +use serde::{Serialize, Deserialize}; use err::Error; +//use std::pin::Pin; -#[derive(Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Channel { + pub backend: String, + pub name: String, +} + +impl Channel { + pub fn name(&self) -> &str { + &self.name + } +} + +#[test] +fn serde_channel() { + let _ex = "{\"name\":\"thechannel\",\"backend\":\"thebackend\"}"; +} + + + + +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct AggQuerySingleChannel { - pub channel: String, + pub ksprefix: String, + pub keyspace: u32, + pub channel: Channel, + pub timebin: u32, + pub split: u32, + pub tbsize: u32, + pub buffer_size: u32, } pub struct BodyStream { diff --git a/retrieval/src/bin/retrieval.rs b/retrieval/src/bin/retrieval.rs index 260f32f..191ed69 100644 --- a/retrieval/src/bin/retrieval.rs +++ b/retrieval/src/bin/retrieval.rs @@ -1,8 +1,6 @@ #[allow(unused_imports)] use tracing::{error, warn, info, debug, trace}; use err::Error; -use http::Method; -use hyper::Body; pub fn main() { match run(go()) { @@ -48,20 +46,30 @@ pub fn tracing_init() { .with_target(true) .with_thread_names(true) //.with_max_level(tracing::Level::INFO) - .with_env_filter(tracing_subscriber::EnvFilter::new("info,retrieval=trace,tokio_postgres=info")) + .with_env_filter(tracing_subscriber::EnvFilter::new("info,retrieval=trace,disk=trace,tokio_postgres=info")) .init(); } #[test] fn simple_fetch() { run(async { + let t1 = chrono::Utc::now(); let query = netpod::AggQuerySingleChannel { - channel: "S10CB01-RIQM-STA:DACI".into(), + ksprefix: "daq_swissfel".into(), + keyspace: 2, + channel: netpod::Channel { + name: "S10BC01-DBAM070:EOM1_T1".into(), + backend: "sf-databuffer".into(), + }, + timebin: 18714, + split: 12, + tbsize: 1000 * 60 * 60 * 24, + buffer_size: 1024 * 4, }; let query_string = serde_json::to_string(&query).unwrap(); - let host = tokio::spawn(httpret::host(8360)); + let _host = tokio::spawn(httpret::host(8360)); let req = hyper::Request::builder() - .method(Method::POST) + .method(http::Method::POST) .uri("http://localhost:8360/api/1/parsed_raw") .body(query_string.into()).unwrap(); let client = hyper::Client::new(); @@ -69,10 +77,12 @@ fn simple_fetch() { info!("client response {:?}", res); let mut res_body = res.into_body(); use hyper::body::HttpBody; + let mut ntot = 0 as u64; loop { match res_body.data().await { Some(Ok(k)) => { - info!("packet.. len {}", k.len()); + //info!("packet.. len {}", k.len()); + ntot += k.len() as u64; } Some(Err(e)) => { error!("{:?}", e); @@ -83,6 +93,10 @@ fn simple_fetch() { } } } + let t2 = chrono::Utc::now(); + let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; + let throughput = ntot / 1024 * 1000 / ms; + info!("total download bytes {} throughput {:5} kB/s", ntot, throughput); //Err::<(), _>(format!("test error").into()) Ok(()) }).unwrap();