Init tracing one time

This commit is contained in:
Dominik Werder
2021-05-27 20:43:48 +02:00
parent 5f1b8d6a36
commit 39615eaf5c
6 changed files with 64 additions and 44 deletions
+2 -8
View File
@@ -537,13 +537,8 @@ impl TBinnedBins for MinMaxAvgScalarBinBatch {
} }
} }
pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static {
// TODO would it be better to express it here? type TBinnedStreamType: Stream<Item = Result<StreamItem<RangeCompletableItem<Self::TBinnedBins>>, Error>> + Send;
//where Result<StreamItem<RangeCompletableItem<Self::XBinnedEvents>>, Error>: FrameType,
{
type TBinnedStreamType: Stream<Item = Result<StreamItem<RangeCompletableItem<Self::TBinnedBins>>, Error>>
+ Send
+ 'static;
type XBinnedEvents: XBinnedEvents<Self>; type XBinnedEvents: XBinnedEvents<Self>;
type TBinnedBins: TBinnedBins; type TBinnedBins: TBinnedBins;
type XBinnedToTBinnedAggregator; type XBinnedToTBinnedAggregator;
@@ -633,7 +628,6 @@ impl BinnedStreamKind for BinnedStreamKindScalar {
) -> Result<Self::TBinnedStreamType, Error> { ) -> Result<Self::TBinnedStreamType, Error> {
let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone(), self.clone()); let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone(), self.clone());
let s = Self::xbinned_to_tbinned(s, range); let s = Self::xbinned_to_tbinned(s, range);
//let s = crate::agg::binnedt::IntoBinnedT::<Self>::into_binned_t(s, range);
Ok(BoxedStream::new(Box::pin(s))?) Ok(BoxedStream::new(Box::pin(s))?)
} }
+19 -14
View File
@@ -12,23 +12,23 @@ use std::future::ready;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
pub struct BinnedScalarStreamFromPreBinnedPatches<BK> pub struct BinnedScalarStreamFromPreBinnedPatches<SK>
where where
BK: BinnedStreamKind, SK: BinnedStreamKind,
{ {
inp: Pin< inp: Pin<
Box< Box<
dyn Stream<Item = Result<StreamItem<RangeCompletableItem<<BK as BinnedStreamKind>::TBinnedBins>>, Error>> dyn Stream<Item = Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::TBinnedBins>>, Error>>
+ Send, + Send,
>, >,
>, >,
_stream_kind: BK, _stream_kind: SK,
} }
impl<BK> BinnedScalarStreamFromPreBinnedPatches<BK> impl<SK> BinnedScalarStreamFromPreBinnedPatches<SK>
where where
BK: BinnedStreamKind, SK: BinnedStreamKind,
Result<StreamItem<RangeCompletableItem<<BK as BinnedStreamKind>::TBinnedBins>>, Error>: FrameType, Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::TBinnedBins>>, Error>: FrameType,
{ {
pub fn new( pub fn new(
patch_it: PreBinnedPatchIterator, patch_it: PreBinnedPatchIterator,
@@ -39,7 +39,7 @@ where
node_config: &NodeConfigCached, node_config: &NodeConfigCached,
disk_stats_every: ByteSize, disk_stats_every: ByteSize,
report_error: bool, report_error: bool,
stream_kind: BK, stream_kind: SK,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let patches: Vec<_> = patch_it.collect(); let patches: Vec<_> = patch_it.collect();
let mut sp = String::new(); let mut sp = String::new();
@@ -51,11 +51,12 @@ where
} }
info!("Using these pre-binned patches:\n{}", sp); info!("Using these pre-binned patches:\n{}", sp);
} }
let inp = futures_util::stream::iter(patches.into_iter()) let pmax = patches.len();
let inp = futures_util::stream::iter(patches.into_iter().enumerate())
.map({ .map({
let node_config = node_config.clone(); let node_config = node_config.clone();
let stream_kind = stream_kind.clone(); let stream_kind = stream_kind.clone();
move |patch| { move |(pix, patch)| {
let query = PreBinnedQuery::new( let query = PreBinnedQuery::new(
patch, patch,
channel.clone(), channel.clone(),
@@ -66,10 +67,10 @@ where
); );
let ret: Pin<Box<dyn Stream<Item = _> + Send>> = let ret: Pin<Box<dyn Stream<Item = _> + Send>> =
match PreBinnedScalarValueFetchedStream::new(&query, &node_config, &stream_kind) { match PreBinnedScalarValueFetchedStream::new(&query, &node_config, &stream_kind) {
Ok(k) => Box::pin(k), Ok(stream) => Box::pin(stream.map(move |q| (pix, q))),
Err(e) => { Err(e) => {
error!("error from PreBinnedValueFetchedStream::new {:?}", e); error!("error from PreBinnedValueFetchedStream::new {:?}", e);
Box::pin(futures_util::stream::iter(vec![Err(e)])) Box::pin(futures_util::stream::iter(vec![(pix, Err(e))]))
} }
}; };
ret ret
@@ -78,7 +79,7 @@ where
.flatten() .flatten()
.filter_map({ .filter_map({
let range = range.clone(); let range = range.clone();
move |k| { move |(pix, k)| {
let fit_range = range.full_range(); let fit_range = range.full_range();
let g = match k { let g = match k {
Ok(item) => match item { Ok(item) => match item {
@@ -86,7 +87,11 @@ where
StreamItem::Stats(item) => Some(Ok(StreamItem::Stats(item))), StreamItem::Stats(item) => Some(Ok(StreamItem::Stats(item))),
StreamItem::DataItem(item) => match item { StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => { RangeCompletableItem::RangeComplete => {
Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))) if pix + 1 == pmax {
Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))
} else {
None
}
} }
RangeCompletableItem::Data(item) => { RangeCompletableItem::Data(item) => {
match crate::binned::FilterFittingInside::filter_fitting_inside(item, fit_range) { match crate::binned::FilterFittingInside::filter_fitting_inside(item, fit_range) {
+6 -6
View File
@@ -12,23 +12,23 @@ use netpod::{NodeConfigCached, PerfOpts};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
pub struct PreBinnedScalarValueFetchedStream<BK> pub struct PreBinnedScalarValueFetchedStream<SK>
where where
BK: BinnedStreamKind, SK: BinnedStreamKind,
{ {
uri: http::Uri, uri: http::Uri,
resfut: Option<hyper::client::ResponseFuture>, resfut: Option<hyper::client::ResponseFuture>,
res: Option<InMemoryFrameAsyncReadStream<HttpBodyAsAsyncRead>>, res: Option<InMemoryFrameAsyncReadStream<HttpBodyAsAsyncRead>>,
errored: bool, errored: bool,
completed: bool, completed: bool,
_stream_kind: BK, _stream_kind: SK,
} }
impl<BK> PreBinnedScalarValueFetchedStream<BK> impl<SK> PreBinnedScalarValueFetchedStream<SK>
where where
BK: BinnedStreamKind, SK: BinnedStreamKind,
{ {
pub fn new(query: &PreBinnedQuery, node_config: &NodeConfigCached, stream_kind: &BK) -> Result<Self, Error> { pub fn new(query: &PreBinnedQuery, node_config: &NodeConfigCached, stream_kind: &SK) -> Result<Self, Error> {
let nodeix = node_ix_for_patch(&query.patch, &query.channel, &node_config.node_config.cluster); let nodeix = node_ix_for_patch(&query.patch, &query.channel, &node_config.node_config.cluster);
let node = &node_config.node_config.cluster.nodes[nodeix as usize]; let node = &node_config.node_config.cluster.nodes[nodeix as usize];
let uri: hyper::Uri = format!( let uri: hyper::Uri = format!(
+18 -7
View File
@@ -52,23 +52,27 @@ fn get_binned_binary() {
async fn get_binned_binary_inner() -> Result<(), Error> { async fn get_binned_binary_inner() -> Result<(), Error> {
let cluster = test_cluster(); let cluster = test_cluster();
let _hosts = spawn_test_hosts(cluster.clone()); let _hosts = spawn_test_hosts(cluster.clone());
if false { if true {
get_binned_channel( get_binned_channel(
"wave-f64-be-n21", "wave-f64-be-n21",
"1970-01-01T00:20:10.000Z", "1970-01-01T00:20:10.000Z",
"1970-01-01T00:20:30.000Z", "1970-01-01T00:20:30.000Z",
2, 2,
&cluster, &cluster,
true,
2,
) )
.await?; .await?;
} }
if false { if true {
get_binned_channel( get_binned_channel(
"wave-u16-le-n77", "wave-u16-le-n77",
"1970-01-01T01:11:00.000Z", "1970-01-01T01:11:00.000Z",
"1970-01-01T01:40:00.000Z", "1970-01-01T01:35:00.000Z",
7, 7,
&cluster, &cluster,
true,
24,
) )
.await?; .await?;
} }
@@ -79,6 +83,8 @@ async fn get_binned_binary_inner() -> Result<(), Error> {
"1970-01-01T03:55:00.000Z", "1970-01-01T03:55:00.000Z",
2, 2,
&cluster, &cluster,
true,
3,
) )
.await?; .await?;
} }
@@ -91,7 +97,9 @@ async fn get_binned_channel<S>(
end_date: S, end_date: S,
bin_count: u32, bin_count: u32,
cluster: &Cluster, cluster: &Cluster,
) -> Result<(), Error> expect_range_complete: bool,
expect_bin_count: u64,
) -> Result<BinnedResponse, Error>
where where
S: AsRef<str>, S: AsRef<str>,
{ {
@@ -128,10 +136,13 @@ where
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
info!("get_cached_0 DONE bin_count {} time {} ms", res.bin_count, ms); info!("get_cached_0 DONE bin_count {} time {} ms", res.bin_count, ms);
if !res.is_valid() { if !res.is_valid() {
error!("result is not valid:\n{:?}", res); Err(Error::with_msg(format!("invalid response: {:?}", res)))
Err(Error::with_msg("result not valid")) } else if res.range_complete_count == 0 && expect_range_complete {
Err(Error::with_msg(format!("expect range complete: {:?}", res)))
} else if res.bin_count != expect_bin_count {
Err(Error::with_msg(format!("bin count mismatch: {:?}", res)))
} else { } else {
Ok(()) Ok(res)
} }
} }
+1
View File
@@ -9,4 +9,5 @@ tokio = { version = "1.5.0", features = ["rt-multi-thread", "io-util", "net", "t
tracing = "0.1.25" tracing = "0.1.25"
tracing-subscriber = "0.2.17" tracing-subscriber = "0.2.17"
backtrace = "0.3.56" backtrace = "0.3.56"
lazy_static = "1.4.0"
err = { path = "../err" } err = { path = "../err" }
+18 -9
View File
@@ -1,6 +1,7 @@
use err::Error; use err::Error;
use std::future::Future; use std::future::Future;
use std::panic; use std::panic;
use std::sync::Mutex;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
#[allow(unused_imports)] #[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn}; use tracing::{debug, error, info, trace, warn};
@@ -49,16 +50,24 @@ pub fn run<T, F: std::future::Future<Output = Result<T, Error>>>(f: F) -> Result
} }
} }
lazy_static::lazy_static! {
pub static ref INITMX: Mutex<u32> = Mutex::new(0);
}
pub fn tracing_init() { pub fn tracing_init() {
tracing_subscriber::fmt() let mut g = INITMX.lock().unwrap();
//.with_timer(tracing_subscriber::fmt::time::uptime()) if *g == 0 {
.with_target(true) tracing_subscriber::fmt()
.with_thread_names(true) //.with_timer(tracing_subscriber::fmt::time::uptime())
//.with_max_level(tracing::Level::INFO) .with_target(true)
.with_env_filter(tracing_subscriber::EnvFilter::new( .with_thread_names(true)
"info,retrieval=trace,retrieval::test=trace,disk::raw::conn=info", //.with_max_level(tracing::Level::INFO)
)) .with_env_filter(tracing_subscriber::EnvFilter::new(
.init(); "info,retrieval=trace,retrieval::test=trace,disk::raw::conn=info",
))
.init();
*g = 1;
}
} }
pub fn spawn<T>(task: T) -> JoinHandle<T::Output> pub fn spawn<T>(task: T) -> JoinHandle<T::Output>