Factor out taskrun

This commit is contained in:
Dominik Werder
2021-04-08 20:39:04 +02:00
parent 3b062b2f5c
commit 20fe590c88
7 changed files with 41 additions and 51 deletions

View File

@@ -7,7 +7,6 @@ edition = "2018"
[dependencies]
tokio = { version = "1.4.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
tracing = "0.1.25"
tracing-subscriber = "0.2.17"
serde_json = "1.0"
async-channel = "1.6"
bytes = "1.0.1"
@@ -17,5 +16,6 @@ futures-util = "0.3.13"
async-stream = "0.3.0"
hex = "0.4.3"
err = { path = "../err" }
taskrun = { path = "../taskrun" }
netpod = { path = "../netpod" }
bitshuffle = { path = "../bitshuffle" }

View File

@@ -766,7 +766,7 @@ const WEEK: u64 = DAY * 7;
#[test]
fn agg_x_dim_0() {
crate::run(async { agg_x_dim_0_inner().await; Ok(()) }).unwrap();
taskrun::run(async { agg_x_dim_0_inner().await; Ok(()) }).unwrap();
}
async fn agg_x_dim_0_inner() {
@@ -812,7 +812,7 @@ async fn agg_x_dim_0_inner() {
#[test]
fn agg_x_dim_1() {
crate::run(async { agg_x_dim_1_inner().await; Ok(()) }).unwrap();
taskrun::run(async { agg_x_dim_1_inner().await; Ok(()) }).unwrap();
}
async fn agg_x_dim_1_inner() {

View File

@@ -929,26 +929,3 @@ impl futures_core::Stream for RawConcatChannelReader {
}
}
fn run<T, F: std::future::Future<Output=Result<T, Error>>>(f: F) -> Result<T, Error> {
tracing_init();
tokio::runtime::Builder::new_multi_thread()
.worker_threads(12)
.max_blocking_threads(256)
.enable_all()
.build()
.unwrap()
.block_on(async {
f.await
})
}
pub fn tracing_init() {
tracing_subscriber::fmt()
//.with_timer(tracing_subscriber::fmt::time::uptime())
.with_target(true)
.with_thread_names(true)
//.with_max_level(tracing::Level::INFO)
.with_env_filter(tracing_subscriber::EnvFilter::new("info,retrieval=trace,disk=trace,tokio_postgres=info"))
.init();
}

View File

@@ -20,5 +20,6 @@ serde_json = "1.0"
chrono = "0.4"
clap = "3.0.0-beta.2"
err = { path = "../err" }
taskrun = { path = "../taskrun" }
netpod = { path = "../netpod" }
httpret = { path = "../httpret" }

View File

@@ -3,7 +3,7 @@ use tracing::{error, warn, info, debug, trace};
use err::Error;
pub fn main() {
match run(go()) {
match taskrun::run(go()) {
Ok(k) => {
info!("{:?}", k);
}
@@ -13,19 +13,6 @@ pub fn main() {
}
}
fn run<T, F: std::future::Future<Output=Result<T, Error>>>(f: F) -> Result<T, Error> {
tracing_init();
tokio::runtime::Builder::new_multi_thread()
.worker_threads(12)
.max_blocking_threads(256)
.enable_all()
.build()
.unwrap()
.block_on(async {
f.await
})
}
async fn go() -> Result<(), Error> {
use clap::Clap;
use retrieval::cli::{Opts, SubCmd};
@@ -40,19 +27,9 @@ async fn go() -> Result<(), Error> {
Ok(())
}
pub fn tracing_init() {
tracing_subscriber::fmt()
//.with_timer(tracing_subscriber::fmt::time::uptime())
.with_target(true)
.with_thread_names(true)
//.with_max_level(tracing::Level::INFO)
.with_env_filter(tracing_subscriber::EnvFilter::new("info,retrieval=trace,disk=trace,tokio_postgres=info"))
.init();
}
#[test]
fn simple_fetch() {
run(async {
taskrun::run(async {
let t1 = chrono::Utc::now();
let query = netpod::AggQuerySingleChannel {
ksprefix: "daq_swissfel".into(),

11
taskrun/Cargo.toml Normal file
View File

@@ -0,0 +1,11 @@
[package]
name = "taskrun"
version = "0.0.1-a.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2018"
[dependencies]
tokio = { version = "1.4.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
tracing = "0.1.25"
tracing-subscriber = "0.2.17"
err = { path = "../err" }

24
taskrun/src/lib.rs Normal file
View File

@@ -0,0 +1,24 @@
use err::Error;
pub fn run<T, F: std::future::Future<Output=Result<T, Error>>>(f: F) -> Result<T, Error> {
tracing_init();
tokio::runtime::Builder::new_multi_thread()
.worker_threads(12)
.max_blocking_threads(256)
.enable_all()
.build()
.unwrap()
.block_on(async {
f.await
})
}
pub fn tracing_init() {
tracing_subscriber::fmt()
//.with_timer(tracing_subscriber::fmt::time::uptime())
.with_target(true)
.with_thread_names(true)
//.with_max_level(tracing::Level::INFO)
.with_env_filter(tracing_subscriber::EnvFilter::new("info,retrieval=trace,disk=trace,tokio_postgres=info"))
.init();
}