From dd7c028a412c22f2221ff762ca40f45e06b58591 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 1 Apr 2021 09:02:04 +0200 Subject: [PATCH] Do simple reads from disk --- disk/src/lib.rs | 25 ++++++++-- err/src/lib.rs | 16 +++++++ netpod/src/lib.rs | 2 +- retrieval/Cargo.toml | 2 + retrieval/src/bin/retrieval.rs | 84 ++++++++++++++++++++++++++-------- retrieval/src/cli.rs | 1 - retrieval/src/lib.rs | 2 - 7 files changed, 104 insertions(+), 28 deletions(-) diff --git a/disk/src/lib.rs b/disk/src/lib.rs index d8f7587..ccfb85a 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -1,3 +1,5 @@ +#[allow(unused_imports)] +use tracing::{error, warn, info, debug, trace}; use err::Error; use std::task::{Context, Poll}; use std::pin::Pin; @@ -11,28 +13,41 @@ pub async fn read_test_1() -> Result { .read(true) .open(path) .await?; + let meta = fin.metadata().await; + debug!("file meta {:?}", meta); let stream = netpod::BodyStream { - inner: Box::new(FileReader { file: fin }), + inner: Box::new(FileReader { + file: fin, + nreads: 0, + }), }; Ok(stream) } struct FileReader { file: tokio::fs::File, + nreads: u32, } impl futures_core::Stream for FileReader { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut buf2 = bytes::BytesMut::with_capacity(13); - if buf2.as_mut().len() != 13 { + if self.nreads >= 10 { + return Poll::Ready(None); + } + let blen = 13; + let mut buf2 = bytes::BytesMut::with_capacity(blen); + buf2.resize(buf2.capacity(), 0); + if buf2.as_mut().len() != blen { panic!("todo prepare slice"); } let mut buf = tokio::io::ReadBuf::new(buf2.as_mut()); - let g = Pin::new(&mut self.file).poll_read(cx, &mut buf); - match g { + match Pin::new(&mut self.file).poll_read(cx, &mut buf) { Poll::Ready(Ok(_)) => { + info!("read from disk: {} nreads {}", buf.filled().len(), self.nreads); + info!("buf2 len: {}", buf2.len()); + self.nreads += 1; Poll::Ready(Some(Ok(buf2.freeze()))) } Poll::Ready(Err(e)) => { diff --git a/err/src/lib.rs b/err/src/lib.rs index 4d83075..3f10e53 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -3,6 +3,22 @@ pub struct Error { msg: String, } +impl Error { + pub fn with_msg>(s: S) -> Self { + Self { + msg: s.into(), + } + } +} + +impl From for Error { + fn from(k: String) -> Self { + Self { + msg: k, + } + } +} + impl std::fmt::Display for Error { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { write!(fmt, "Error") diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 268e543..c66f6d0 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -4,7 +4,7 @@ use err::Error; #[derive(Serialize, Deserialize)] pub struct AggQuerySingleChannel { - channel: String, + pub channel: String, } pub struct BodyStream { diff --git a/retrieval/Cargo.toml b/retrieval/Cargo.toml index 6258e43..bdf8c39 100644 --- a/retrieval/Cargo.toml +++ b/retrieval/Cargo.toml @@ -20,3 +20,5 @@ serde_json = "1.0" chrono = "0.4" clap = "3.0.0-beta.2" err = { path = "../err" } +netpod = { path = "../netpod" } +httpret = { path = "../httpret" } diff --git a/retrieval/src/bin/retrieval.rs b/retrieval/src/bin/retrieval.rs index 7a5edb0..260f32f 100644 --- a/retrieval/src/bin/retrieval.rs +++ b/retrieval/src/bin/retrieval.rs @@ -1,6 +1,22 @@ +#[allow(unused_imports)] +use tracing::{error, warn, info, debug, trace}; use err::Error; +use http::Method; +use hyper::Body; pub fn main() { + match run(go()) { + Ok(k) => { + info!("{:?}", k); + } + Err(k) => { + error!("{:?}", k); + } + } +} + +fn run>>(f: F) -> Result { + tracing_init(); tokio::runtime::Builder::new_multi_thread() .worker_threads(12) .max_blocking_threads(256) @@ -8,36 +24,66 @@ pub fn main() { .build() .unwrap() .block_on(async { - tracing_subscriber::fmt() - .with_env_filter(tracing_subscriber::EnvFilter::new("daqbuffer=trace,tokio_postgres=info")) - .init(); - go().await; + f.await }) } -async fn go() { - match inner().await { - Ok(_) => (), - Err(_) => () - } -} - -async fn inner() -> Result<(), Error> { +async fn go() -> Result<(), Error> { use clap::Clap; - use retrieval::cli::Opts; - let _opts = Opts::parse(); + use retrieval::cli::{Opts, SubCmd}; + let opts = Opts::parse(); + match opts.subcmd { + SubCmd::Retrieval(_subcmd) => { + trace!("testout"); + info!("testout"); + error!("testout"); + } + } Ok(()) } pub fn tracing_init() { tracing_subscriber::fmt() - .with_env_filter(tracing_subscriber::EnvFilter::new("retrieval=trace,tokio_postgres=info")) + //.with_timer(tracing_subscriber::fmt::time::uptime()) + .with_target(true) + .with_thread_names(true) + //.with_max_level(tracing::Level::INFO) + .with_env_filter(tracing_subscriber::EnvFilter::new("info,retrieval=trace,tokio_postgres=info")) .init(); } #[test] -fn t1() { - tracing_init(); - tracing::error!("parsed"); - //panic!(); +fn simple_fetch() { + run(async { + let query = netpod::AggQuerySingleChannel { + channel: "S10CB01-RIQM-STA:DACI".into(), + }; + let query_string = serde_json::to_string(&query).unwrap(); + let host = tokio::spawn(httpret::host(8360)); + let req = hyper::Request::builder() + .method(Method::POST) + .uri("http://localhost:8360/api/1/parsed_raw") + .body(query_string.into()).unwrap(); + let client = hyper::Client::new(); + let res = client.request(req).await?; + info!("client response {:?}", res); + let mut res_body = res.into_body(); + use hyper::body::HttpBody; + loop { + match res_body.data().await { + Some(Ok(k)) => { + info!("packet.. len {}", k.len()); + } + Some(Err(e)) => { + error!("{:?}", e); + } + None => { + info!("response stream exhausted"); + break; + } + } + } + //Err::<(), _>(format!("test error").into()) + Ok(()) + }).unwrap(); } diff --git a/retrieval/src/cli.rs b/retrieval/src/cli.rs index b1b8834..beb0066 100644 --- a/retrieval/src/cli.rs +++ b/retrieval/src/cli.rs @@ -11,7 +11,6 @@ pub struct Opts { #[derive(Debug, Clap)] pub enum SubCmd { - Version, Retrieval(Retrieval), } diff --git a/retrieval/src/lib.rs b/retrieval/src/lib.rs index 8b2a735..4f77372 100644 --- a/retrieval/src/lib.rs +++ b/retrieval/src/lib.rs @@ -1,3 +1 @@ pub mod cli; - -pub fn test() {}