commit 6dbc7cb605eb761c0359d51f352fb29f78a8837e Author: Dominik Werder Date: Wed Mar 31 22:17:54 2021 +0200 WIP on simple disk serve diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3e0782b --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/Cargo.lock +/target +/.idea diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..bc74d0d --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,2 @@ +[workspace] +members = ["retrieval", "httpret", "err", "disk"] diff --git a/disk/Cargo.toml b/disk/Cargo.toml new file mode 100644 index 0000000..2cbd940 --- /dev/null +++ b/disk/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "disk" +version = "0.0.1-a.0" +authors = ["Dominik Werder "] +edition = "2018" + +[dependencies] +tokio = { version = "1.4.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tracing = "0.1.25" +serde_json = "1.0" +async-channel = "1.6" +bytes = "1.0.1" +futures-core = "0.3.12" +err = { path = "../err" } +netpod = { path = "../netpod" } diff --git a/disk/src/lib.rs b/disk/src/lib.rs new file mode 100644 index 0000000..d8f7587 --- /dev/null +++ b/disk/src/lib.rs @@ -0,0 +1,45 @@ +use err::Error; +use std::task::{Context, Poll}; +use std::pin::Pin; +use tokio::io::AsyncRead; +//use std::future::Future; +//use futures_core::Stream; + +pub async fn read_test_1() -> Result { + let path = "/data/sf-databuffer/daq_swissfel/daq_swissfel_3/byTime/S10CB01-RIQM-DCP10:FOR-AMPLT/0000000000000018714/0000000012/0000000000086400000_00000_Data"; + let fin = tokio::fs::OpenOptions::new() + .read(true) + .open(path) + .await?; + let stream = netpod::BodyStream { + inner: Box::new(FileReader { file: fin }), + }; + Ok(stream) +} + +struct FileReader { + file: tokio::fs::File, +} + +impl futures_core::Stream for FileReader { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut buf2 = bytes::BytesMut::with_capacity(13); + if buf2.as_mut().len() != 13 { + panic!("todo prepare slice"); + } + let mut buf = tokio::io::ReadBuf::new(buf2.as_mut()); + let g = Pin::new(&mut self.file).poll_read(cx, &mut buf); + match g { + Poll::Ready(Ok(_)) => { + Poll::Ready(Some(Ok(buf2.freeze()))) + } + Poll::Ready(Err(e)) => { + Poll::Ready(Some(Err(Error::from(e)))) + } + Poll::Pending => Poll::Pending + } + } + +} diff --git a/err/Cargo.toml b/err/Cargo.toml new file mode 100644 index 0000000..0dad62d --- /dev/null +++ b/err/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "err" +version = "0.0.1-a.0" +authors = ["Dominik Werder "] +edition = "2018" + +[dependencies] +hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp"] } +http = "0.2" +serde_json = "1.0" diff --git a/err/src/lib.rs b/err/src/lib.rs new file mode 100644 index 0000000..4d83075 --- /dev/null +++ b/err/src/lib.rs @@ -0,0 +1,45 @@ +#[derive(Debug)] +pub struct Error { + msg: String, +} + +impl std::fmt::Display for Error { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(fmt, "Error") + } +} + +impl std::error::Error for Error { +} + +impl From for Error { + fn from (k: std::io::Error) -> Self { + Self { + msg: k.to_string(), + } + } +} + +impl From for Error { + fn from (k: http::Error) -> Self { + Self { + msg: k.to_string(), + } + } +} + +impl From for Error { + fn from (k: hyper::Error) -> Self { + Self { + msg: k.to_string(), + } + } +} + +impl From for Error { + fn from (k: serde_json::Error) -> Self { + Self { + msg: k.to_string(), + } + } +} diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml new file mode 100644 index 0000000..0dfc504 --- /dev/null +++ b/httpret/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "httpret" +version = "0.0.1-a.0" +authors = ["Dominik Werder "] +edition = "2018" + +[dependencies] +hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] } +http = "0.2" +bytes = "1.0.1" +futures-core = "0.3.12" +tracing = "0.1.25" +serde_json = "1.0" +async-channel = "1.6" +err = { path = "../err" } +netpod = { path = "../netpod" } +disk = { path = "../disk" } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs new file mode 100644 index 0000000..28e0a58 --- /dev/null +++ b/httpret/src/lib.rs @@ -0,0 +1,108 @@ +#[allow(unused_imports)] +use tracing::{error, warn, info, debug, trace}; +use err::Error; +use std::net::SocketAddr; +use http::{Method, StatusCode, HeaderMap}; +use hyper::{Body, Request, Response}; +use hyper::server::Server; +use hyper::service::{make_service_fn, service_fn}; +use std::task::{Context, Poll}; +use std::pin::Pin; +//use std::pin::Pin; +//use std::future::Future; +//use serde_derive::{Serialize, Deserialize}; +//use serde_json::{Value as SerdeValue, Value as JsonValue}; + +pub async fn host(port: u16) -> Result<(), Error> { + let addr = SocketAddr::from(([0, 0, 0, 0], port)); + let make_service = make_service_fn(|_conn| async { + Ok::<_, Error>(service_fn(data_api_proxy)) + }); + Server::bind(&addr).serve(make_service).await?; + Ok(()) +} + +async fn data_api_proxy(req: Request) -> Result, Error> { + match data_api_proxy_try(req).await { + Ok(k) => { Ok(k) } + Err(e) => { + error!("{:?}", e); + Err(e) + } + } +} + +async fn data_api_proxy_try(req: Request) -> Result, Error> { + let uri = req.uri().clone(); + let path = uri.path(); + if path == "/api/1/parsed_raw" { + if req.method() == Method::POST { + Ok(parsed_raw(req).await?) + } + else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } + else { + Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?) + } +} + +fn response(status: T) -> http::response::Builder + where + http::StatusCode: std::convert::TryFrom, + >::Error: Into, +{ + Response::builder().status(status) + .header("access-control-allow-origin", "*") + .header("access-control-allow-headers", "*") +} + +async fn parsed_raw(req: Request) -> Result, Error> { + use netpod::AggQuerySingleChannel; + let reqbody = req.into_body(); + let bodyslice = hyper::body::to_bytes(reqbody).await?; + let _query: AggQuerySingleChannel = serde_json::from_slice(&bodyslice)?; + let q = disk::read_test_1().await?; + let s = q.inner; + let res = response(StatusCode::OK) + .body(Body::wrap_stream(s))?; + /* + let res = match q { + Ok(k) => { + response(StatusCode::OK) + .body(Body::wrap_stream(k.inner))? + } + Err(e) => { + response(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::empty())? + } + }; + */ + Ok(res) +} + + +struct BodyStreamWrap(netpod::BodyStream); + +impl hyper::body::HttpBody for BodyStreamWrap { + type Data = bytes::Bytes; + type Error = Error; + + fn poll_data(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll>> { + /* + use futures_core::stream::Stream; + let z: &mut async_channel::Receiver> = &mut self.0.receiver; + match Pin::new(z).poll_next(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(k) => Poll::Ready(k), + } + */ + todo!() + } + + fn poll_trailers(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll, Self::Error>> { + Poll::Ready(Ok(None)) + } + +} diff --git a/netpod/Cargo.toml b/netpod/Cargo.toml new file mode 100644 index 0000000..95d13ba --- /dev/null +++ b/netpod/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "netpod" +version = "0.0.1-a.0" +authors = ["Dominik Werder "] +edition = "2018" + +[dependencies] +serde = "1.0" +serde_derive = "1.0" +async-channel = "1.6" +bytes = "1.0.1" +futures-core = "0.3.12" +err = { path = "../err" } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs new file mode 100644 index 0000000..268e543 --- /dev/null +++ b/netpod/src/lib.rs @@ -0,0 +1,13 @@ +use serde_derive::{Serialize, Deserialize}; +//use std::pin::Pin; +use err::Error; + +#[derive(Serialize, Deserialize)] +pub struct AggQuerySingleChannel { + channel: String, +} + +pub struct BodyStream { + //pub receiver: async_channel::Receiver>, + pub inner: Box> + Send + Unpin>, +} diff --git a/retrieval/Cargo.toml b/retrieval/Cargo.toml new file mode 100644 index 0000000..6258e43 --- /dev/null +++ b/retrieval/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "retrieval" +version = "0.0.1-a.0" +authors = ["Dominik Werder "] +edition = "2018" + +[dependencies] +tokio = { version = "1.4.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +hyper = "0.14" +http = "0.2" +tracing = "0.1.25" +tracing-subscriber = "0.2.17" +bytes = "1.0.1" +#async-channel = "1" +#dashmap = "3" +tokio-postgres = "0.7" +serde = "1.0" +serde_derive = "1.0" +serde_json = "1.0" +chrono = "0.4" +clap = "3.0.0-beta.2" +err = { path = "../err" } diff --git a/retrieval/src/bin/retrieval.rs b/retrieval/src/bin/retrieval.rs new file mode 100644 index 0000000..7a5edb0 --- /dev/null +++ b/retrieval/src/bin/retrieval.rs @@ -0,0 +1,43 @@ +use err::Error; + +pub fn main() { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(12) + .max_blocking_threads(256) + .enable_all() + .build() + .unwrap() + .block_on(async { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::new("daqbuffer=trace,tokio_postgres=info")) + .init(); + go().await; + }) +} + +async fn go() { + match inner().await { + Ok(_) => (), + Err(_) => () + } +} + +async fn inner() -> Result<(), Error> { + use clap::Clap; + use retrieval::cli::Opts; + let _opts = Opts::parse(); + Ok(()) +} + +pub fn tracing_init() { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::new("retrieval=trace,tokio_postgres=info")) + .init(); +} + +#[test] +fn t1() { + tracing_init(); + tracing::error!("parsed"); + //panic!(); +} diff --git a/retrieval/src/cli.rs b/retrieval/src/cli.rs new file mode 100644 index 0000000..b1b8834 --- /dev/null +++ b/retrieval/src/cli.rs @@ -0,0 +1,20 @@ +use clap::{Clap, crate_version}; + +#[derive(Debug, Clap)] +#[clap(name="retrieval", author="Dominik Werder ", version=crate_version!())] +pub struct Opts { + #[clap(short, long, parse(from_occurrences))] + pub verbose: i32, + #[clap(subcommand)] + pub subcmd: SubCmd, +} + +#[derive(Debug, Clap)] +pub enum SubCmd { + Version, + Retrieval(Retrieval), +} + +#[derive(Debug, Clap)] +pub struct Retrieval { +} diff --git a/retrieval/src/lib.rs b/retrieval/src/lib.rs new file mode 100644 index 0000000..8b2a735 --- /dev/null +++ b/retrieval/src/lib.rs @@ -0,0 +1,3 @@ +pub mod cli; + +pub fn test() {}