Runs on remote
This commit is contained in:
@@ -40,6 +40,17 @@ pub enum CacheUsage {
|
|||||||
Recreate,
|
Recreate,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl CacheUsage {
|
||||||
|
pub fn query_param_value(&self) -> String {
|
||||||
|
match self {
|
||||||
|
CacheUsage::Use => "use",
|
||||||
|
CacheUsage::Ignore => "ignore",
|
||||||
|
CacheUsage::Recreate => "recreate",
|
||||||
|
}
|
||||||
|
.into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct BinnedQuery {
|
pub struct BinnedQuery {
|
||||||
range: NanoRange,
|
range: NanoRange,
|
||||||
|
|||||||
Vendored
+1
-1
@@ -252,7 +252,7 @@ impl Stream for PreBinnedValueStream {
|
|||||||
match self.query.cache_usage {
|
match self.query.cache_usage {
|
||||||
super::CacheUsage::Use | super::CacheUsage::Recreate => {
|
super::CacheUsage::Use | super::CacheUsage::Recreate => {
|
||||||
let msg = format!(
|
let msg = format!(
|
||||||
"Write cache file\n{:?}\nN: {}\n\n\n",
|
"Write cache file query: {:?} bin count: {}",
|
||||||
self.query.patch,
|
self.query.patch,
|
||||||
self.values.ts1s.len()
|
self.values.ts1s.len()
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -59,7 +59,7 @@ impl EventChunker {
|
|||||||
data_emit_complete: false,
|
data_emit_complete: false,
|
||||||
final_stats_sent: false,
|
final_stats_sent: false,
|
||||||
data_since_last_stats: 0,
|
data_since_last_stats: 0,
|
||||||
stats_emit_interval: 64,
|
stats_emit_interval: 256,
|
||||||
parsed_bytes: 0,
|
parsed_bytes: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -84,7 +84,7 @@ impl EventChunker {
|
|||||||
data_emit_complete: false,
|
data_emit_complete: false,
|
||||||
final_stats_sent: false,
|
final_stats_sent: false,
|
||||||
data_since_last_stats: 0,
|
data_since_last_stats: 0,
|
||||||
stats_emit_interval: 64,
|
stats_emit_interval: 256,
|
||||||
parsed_bytes: 0,
|
parsed_bytes: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
use disk::cache::CacheUsage;
|
||||||
use err::Error;
|
use err::Error;
|
||||||
use netpod::{NodeConfig, NodeConfigCached};
|
use netpod::{NodeConfig, NodeConfigCached};
|
||||||
use tokio::io::AsyncReadExt;
|
use tokio::io::AsyncReadExt;
|
||||||
@@ -36,8 +37,24 @@ async fn go() -> Result<(), Error> {
|
|||||||
ClientType::Binned(opts) => {
|
ClientType::Binned(opts) => {
|
||||||
let beg = opts.beg.parse()?;
|
let beg = opts.beg.parse()?;
|
||||||
let end = opts.end.parse()?;
|
let end = opts.end.parse()?;
|
||||||
retrieval::client::get_binned(opts.host, opts.port, opts.backend, opts.channel, beg, end, opts.bins)
|
let cache_usage = if opts.ignore_cache {
|
||||||
.await?;
|
CacheUsage::Ignore
|
||||||
|
} else if opts.recreate_cache {
|
||||||
|
CacheUsage::Recreate
|
||||||
|
} else {
|
||||||
|
CacheUsage::Use
|
||||||
|
};
|
||||||
|
retrieval::client::get_binned(
|
||||||
|
opts.host,
|
||||||
|
opts.port,
|
||||||
|
opts.backend,
|
||||||
|
opts.channel,
|
||||||
|
beg,
|
||||||
|
end,
|
||||||
|
opts.bins,
|
||||||
|
cache_usage,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
SubCmd::GenerateTestData => {
|
SubCmd::GenerateTestData => {
|
||||||
|
|||||||
@@ -49,4 +49,8 @@ pub struct BinnedClient {
|
|||||||
pub end: String,
|
pub end: String,
|
||||||
#[clap(long)]
|
#[clap(long)]
|
||||||
pub bins: u32,
|
pub bins: u32,
|
||||||
|
#[clap(long)]
|
||||||
|
pub ignore_cache: bool,
|
||||||
|
#[clap(long)]
|
||||||
|
pub recreate_cache: bool,
|
||||||
}
|
}
|
||||||
|
|||||||
+18
-3
@@ -1,5 +1,8 @@
|
|||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
|
use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem;
|
||||||
|
use disk::cache::CacheUsage;
|
||||||
use disk::frame::inmem::InMemoryFrameAsyncReadStream;
|
use disk::frame::inmem::InMemoryFrameAsyncReadStream;
|
||||||
|
use disk::streamlog::Streamlog;
|
||||||
use err::Error;
|
use err::Error;
|
||||||
use futures_util::TryStreamExt;
|
use futures_util::TryStreamExt;
|
||||||
use http::StatusCode;
|
use http::StatusCode;
|
||||||
@@ -15,12 +18,13 @@ pub async fn get_binned(
|
|||||||
beg_date: DateTime<Utc>,
|
beg_date: DateTime<Utc>,
|
||||||
end_date: DateTime<Utc>,
|
end_date: DateTime<Utc>,
|
||||||
bin_count: u32,
|
bin_count: u32,
|
||||||
|
cache_usage: CacheUsage,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
info!("------- get_binned client");
|
info!("------- get_binned client");
|
||||||
let t1 = Utc::now();
|
let t1 = Utc::now();
|
||||||
let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ";
|
let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ";
|
||||||
let uri = format!(
|
let uri = format!(
|
||||||
"http://{}:{}/api/1/binned?channel_backend={}&channel_name={}&beg_date={}&end_date={}&bin_count={}",
|
"http://{}:{}/api/1/binned?channel_backend={}&channel_name={}&beg_date={}&end_date={}&bin_count={}&cache_usage={}",
|
||||||
host,
|
host,
|
||||||
port,
|
port,
|
||||||
channel_backend,
|
channel_backend,
|
||||||
@@ -28,6 +32,7 @@ pub async fn get_binned(
|
|||||||
beg_date.format(date_fmt),
|
beg_date.format(date_fmt),
|
||||||
end_date.format(date_fmt),
|
end_date.format(date_fmt),
|
||||||
bin_count,
|
bin_count,
|
||||||
|
cache_usage.query_param_value(),
|
||||||
);
|
);
|
||||||
info!("get_binned uri {:?}", uri);
|
info!("get_binned uri {:?}", uri);
|
||||||
let req = hyper::Request::builder()
|
let req = hyper::Request::builder()
|
||||||
@@ -56,8 +61,18 @@ pub async fn get_binned(
|
|||||||
match bincode::deserialize::<ExpectedType>(frame.buf()) {
|
match bincode::deserialize::<ExpectedType>(frame.buf()) {
|
||||||
Ok(item) => match item {
|
Ok(item) => match item {
|
||||||
Ok(item) => {
|
Ok(item) => {
|
||||||
info!("len {} item {:?}", n1, item);
|
match &item {
|
||||||
bin_count += 1;
|
MinMaxAvgScalarBinBatchStreamItem::Log(item) => {
|
||||||
|
Streamlog::emit(item);
|
||||||
|
}
|
||||||
|
MinMaxAvgScalarBinBatchStreamItem::Values(item) => {
|
||||||
|
bin_count += 1;
|
||||||
|
info!("len {} values {:?}", n1, item);
|
||||||
|
}
|
||||||
|
item => {
|
||||||
|
info!("len {} item {:?}", n1, item);
|
||||||
|
}
|
||||||
|
}
|
||||||
Some(Ok(item))
|
Some(Ok(item))
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|||||||
Reference in New Issue
Block a user