WIP on simple disk serve
This commit is contained in:
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
/Cargo.lock
|
||||
/target
|
||||
/.idea
|
||||
2
Cargo.toml
Normal file
2
Cargo.toml
Normal file
@@ -0,0 +1,2 @@
|
||||
[workspace]
|
||||
members = ["retrieval", "httpret", "err", "disk"]
|
||||
15
disk/Cargo.toml
Normal file
15
disk/Cargo.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
[package]
|
||||
name = "disk"
|
||||
version = "0.0.1-a.0"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1.4.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
|
||||
tracing = "0.1.25"
|
||||
serde_json = "1.0"
|
||||
async-channel = "1.6"
|
||||
bytes = "1.0.1"
|
||||
futures-core = "0.3.12"
|
||||
err = { path = "../err" }
|
||||
netpod = { path = "../netpod" }
|
||||
45
disk/src/lib.rs
Normal file
45
disk/src/lib.rs
Normal file
@@ -0,0 +1,45 @@
|
||||
use err::Error;
|
||||
use std::task::{Context, Poll};
|
||||
use std::pin::Pin;
|
||||
use tokio::io::AsyncRead;
|
||||
//use std::future::Future;
|
||||
//use futures_core::Stream;
|
||||
|
||||
pub async fn read_test_1() -> Result<netpod::BodyStream, Error> {
|
||||
let path = "/data/sf-databuffer/daq_swissfel/daq_swissfel_3/byTime/S10CB01-RIQM-DCP10:FOR-AMPLT/0000000000000018714/0000000012/0000000000086400000_00000_Data";
|
||||
let fin = tokio::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(path)
|
||||
.await?;
|
||||
let stream = netpod::BodyStream {
|
||||
inner: Box::new(FileReader { file: fin }),
|
||||
};
|
||||
Ok(stream)
|
||||
}
|
||||
|
||||
struct FileReader {
|
||||
file: tokio::fs::File,
|
||||
}
|
||||
|
||||
impl futures_core::Stream for FileReader {
|
||||
type Item = Result<bytes::Bytes, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let mut buf2 = bytes::BytesMut::with_capacity(13);
|
||||
if buf2.as_mut().len() != 13 {
|
||||
panic!("todo prepare slice");
|
||||
}
|
||||
let mut buf = tokio::io::ReadBuf::new(buf2.as_mut());
|
||||
let g = Pin::new(&mut self.file).poll_read(cx, &mut buf);
|
||||
match g {
|
||||
Poll::Ready(Ok(_)) => {
|
||||
Poll::Ready(Some(Ok(buf2.freeze())))
|
||||
}
|
||||
Poll::Ready(Err(e)) => {
|
||||
Poll::Ready(Some(Err(Error::from(e))))
|
||||
}
|
||||
Poll::Pending => Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
10
err/Cargo.toml
Normal file
10
err/Cargo.toml
Normal file
@@ -0,0 +1,10 @@
|
||||
[package]
|
||||
name = "err"
|
||||
version = "0.0.1-a.0"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp"] }
|
||||
http = "0.2"
|
||||
serde_json = "1.0"
|
||||
45
err/src/lib.rs
Normal file
45
err/src/lib.rs
Normal file
@@ -0,0 +1,45 @@
|
||||
#[derive(Debug)]
|
||||
pub struct Error {
|
||||
msg: String,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Error {
|
||||
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(fmt, "Error")
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for Error {
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for Error {
|
||||
fn from (k: std::io::Error) -> Self {
|
||||
Self {
|
||||
msg: k.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<http::Error> for Error {
|
||||
fn from (k: http::Error) -> Self {
|
||||
Self {
|
||||
msg: k.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<hyper::Error> for Error {
|
||||
fn from (k: hyper::Error) -> Self {
|
||||
Self {
|
||||
msg: k.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<serde_json::Error> for Error {
|
||||
fn from (k: serde_json::Error) -> Self {
|
||||
Self {
|
||||
msg: k.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
17
httpret/Cargo.toml
Normal file
17
httpret/Cargo.toml
Normal file
@@ -0,0 +1,17 @@
|
||||
[package]
|
||||
name = "httpret"
|
||||
version = "0.0.1-a.0"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] }
|
||||
http = "0.2"
|
||||
bytes = "1.0.1"
|
||||
futures-core = "0.3.12"
|
||||
tracing = "0.1.25"
|
||||
serde_json = "1.0"
|
||||
async-channel = "1.6"
|
||||
err = { path = "../err" }
|
||||
netpod = { path = "../netpod" }
|
||||
disk = { path = "../disk" }
|
||||
108
httpret/src/lib.rs
Normal file
108
httpret/src/lib.rs
Normal file
@@ -0,0 +1,108 @@
|
||||
#[allow(unused_imports)]
|
||||
use tracing::{error, warn, info, debug, trace};
|
||||
use err::Error;
|
||||
use std::net::SocketAddr;
|
||||
use http::{Method, StatusCode, HeaderMap};
|
||||
use hyper::{Body, Request, Response};
|
||||
use hyper::server::Server;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use std::task::{Context, Poll};
|
||||
use std::pin::Pin;
|
||||
//use std::pin::Pin;
|
||||
//use std::future::Future;
|
||||
//use serde_derive::{Serialize, Deserialize};
|
||||
//use serde_json::{Value as SerdeValue, Value as JsonValue};
|
||||
|
||||
pub async fn host(port: u16) -> Result<(), Error> {
|
||||
let addr = SocketAddr::from(([0, 0, 0, 0], port));
|
||||
let make_service = make_service_fn(|_conn| async {
|
||||
Ok::<_, Error>(service_fn(data_api_proxy))
|
||||
});
|
||||
Server::bind(&addr).serve(make_service).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn data_api_proxy(req: Request<Body>) -> Result<Response<Body>, Error> {
|
||||
match data_api_proxy_try(req).await {
|
||||
Ok(k) => { Ok(k) }
|
||||
Err(e) => {
|
||||
error!("{:?}", e);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn data_api_proxy_try(req: Request<Body>) -> Result<Response<Body>, Error> {
|
||||
let uri = req.uri().clone();
|
||||
let path = uri.path();
|
||||
if path == "/api/1/parsed_raw" {
|
||||
if req.method() == Method::POST {
|
||||
Ok(parsed_raw(req).await?)
|
||||
}
|
||||
else {
|
||||
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
|
||||
}
|
||||
}
|
||||
else {
|
||||
Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?)
|
||||
}
|
||||
}
|
||||
|
||||
fn response<T>(status: T) -> http::response::Builder
|
||||
where
|
||||
http::StatusCode: std::convert::TryFrom<T>,
|
||||
<http::StatusCode as std::convert::TryFrom<T>>::Error: Into<http::Error>,
|
||||
{
|
||||
Response::builder().status(status)
|
||||
.header("access-control-allow-origin", "*")
|
||||
.header("access-control-allow-headers", "*")
|
||||
}
|
||||
|
||||
async fn parsed_raw(req: Request<Body>) -> Result<Response<Body>, Error> {
|
||||
use netpod::AggQuerySingleChannel;
|
||||
let reqbody = req.into_body();
|
||||
let bodyslice = hyper::body::to_bytes(reqbody).await?;
|
||||
let _query: AggQuerySingleChannel = serde_json::from_slice(&bodyslice)?;
|
||||
let q = disk::read_test_1().await?;
|
||||
let s = q.inner;
|
||||
let res = response(StatusCode::OK)
|
||||
.body(Body::wrap_stream(s))?;
|
||||
/*
|
||||
let res = match q {
|
||||
Ok(k) => {
|
||||
response(StatusCode::OK)
|
||||
.body(Body::wrap_stream(k.inner))?
|
||||
}
|
||||
Err(e) => {
|
||||
response(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
.body(Body::empty())?
|
||||
}
|
||||
};
|
||||
*/
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
|
||||
struct BodyStreamWrap(netpod::BodyStream);
|
||||
|
||||
impl hyper::body::HttpBody for BodyStreamWrap {
|
||||
type Data = bytes::Bytes;
|
||||
type Error = Error;
|
||||
|
||||
fn poll_data(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Result<Self::Data, Self::Error>>> {
|
||||
/*
|
||||
use futures_core::stream::Stream;
|
||||
let z: &mut async_channel::Receiver<Result<Self::Data, Self::Error>> = &mut self.0.receiver;
|
||||
match Pin::new(z).poll_next(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(k) => Poll::Ready(k),
|
||||
}
|
||||
*/
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn poll_trailers(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
|
||||
Poll::Ready(Ok(None))
|
||||
}
|
||||
|
||||
}
|
||||
13
netpod/Cargo.toml
Normal file
13
netpod/Cargo.toml
Normal file
@@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "netpod"
|
||||
version = "0.0.1-a.0"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
serde = "1.0"
|
||||
serde_derive = "1.0"
|
||||
async-channel = "1.6"
|
||||
bytes = "1.0.1"
|
||||
futures-core = "0.3.12"
|
||||
err = { path = "../err" }
|
||||
13
netpod/src/lib.rs
Normal file
13
netpod/src/lib.rs
Normal file
@@ -0,0 +1,13 @@
|
||||
use serde_derive::{Serialize, Deserialize};
|
||||
//use std::pin::Pin;
|
||||
use err::Error;
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct AggQuerySingleChannel {
|
||||
channel: String,
|
||||
}
|
||||
|
||||
pub struct BodyStream {
|
||||
//pub receiver: async_channel::Receiver<Result<bytes::Bytes, Error>>,
|
||||
pub inner: Box<dyn futures_core::Stream<Item=Result<bytes::Bytes, Error>> + Send + Unpin>,
|
||||
}
|
||||
22
retrieval/Cargo.toml
Normal file
22
retrieval/Cargo.toml
Normal file
@@ -0,0 +1,22 @@
|
||||
[package]
|
||||
name = "retrieval"
|
||||
version = "0.0.1-a.0"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1.4.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
|
||||
hyper = "0.14"
|
||||
http = "0.2"
|
||||
tracing = "0.1.25"
|
||||
tracing-subscriber = "0.2.17"
|
||||
bytes = "1.0.1"
|
||||
#async-channel = "1"
|
||||
#dashmap = "3"
|
||||
tokio-postgres = "0.7"
|
||||
serde = "1.0"
|
||||
serde_derive = "1.0"
|
||||
serde_json = "1.0"
|
||||
chrono = "0.4"
|
||||
clap = "3.0.0-beta.2"
|
||||
err = { path = "../err" }
|
||||
43
retrieval/src/bin/retrieval.rs
Normal file
43
retrieval/src/bin/retrieval.rs
Normal file
@@ -0,0 +1,43 @@
|
||||
use err::Error;
|
||||
|
||||
pub fn main() {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(12)
|
||||
.max_blocking_threads(256)
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
.block_on(async {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::new("daqbuffer=trace,tokio_postgres=info"))
|
||||
.init();
|
||||
go().await;
|
||||
})
|
||||
}
|
||||
|
||||
async fn go() {
|
||||
match inner().await {
|
||||
Ok(_) => (),
|
||||
Err(_) => ()
|
||||
}
|
||||
}
|
||||
|
||||
async fn inner() -> Result<(), Error> {
|
||||
use clap::Clap;
|
||||
use retrieval::cli::Opts;
|
||||
let _opts = Opts::parse();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn tracing_init() {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::new("retrieval=trace,tokio_postgres=info"))
|
||||
.init();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn t1() {
|
||||
tracing_init();
|
||||
tracing::error!("parsed");
|
||||
//panic!();
|
||||
}
|
||||
20
retrieval/src/cli.rs
Normal file
20
retrieval/src/cli.rs
Normal file
@@ -0,0 +1,20 @@
|
||||
use clap::{Clap, crate_version};
|
||||
|
||||
#[derive(Debug, Clap)]
|
||||
#[clap(name="retrieval", author="Dominik Werder <dominik.werder@gmail.com>", version=crate_version!())]
|
||||
pub struct Opts {
|
||||
#[clap(short, long, parse(from_occurrences))]
|
||||
pub verbose: i32,
|
||||
#[clap(subcommand)]
|
||||
pub subcmd: SubCmd,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clap)]
|
||||
pub enum SubCmd {
|
||||
Version,
|
||||
Retrieval(Retrieval),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clap)]
|
||||
pub struct Retrieval {
|
||||
}
|
||||
3
retrieval/src/lib.rs
Normal file
3
retrieval/src/lib.rs
Normal file
@@ -0,0 +1,3 @@
|
||||
pub mod cli;
|
||||
|
||||
pub fn test() {}
|
||||
Reference in New Issue
Block a user