WIP on file-lookup for expand-query

This commit is contained in:
Dominik Werder
2021-08-31 16:12:13 +02:00
parent ba568c8850
commit 6581946eaf
17 changed files with 170 additions and 95 deletions
+1
View File
@@ -97,6 +97,7 @@ async fn go() -> Result<(), Error> {
SubCmd::Zmtp(zmtp) => { SubCmd::Zmtp(zmtp) => {
netfetch::zmtp::zmtp_client(&zmtp.addr).await?; netfetch::zmtp::zmtp_client(&zmtp.addr).await?;
} }
SubCmd::Test => (),
} }
Ok(()) Ok(())
} }
+1
View File
@@ -16,6 +16,7 @@ pub enum SubCmd {
Client(Client), Client(Client),
GenerateTestData, GenerateTestData,
Zmtp(Zmtp), Zmtp(Zmtp),
Test,
} }
#[derive(Debug, Clap)] #[derive(Debug, Clap)]
+1 -29
View File
@@ -17,7 +17,7 @@ pub fn require_test_hosts_running() -> Result<Arc<RunningHosts>, Error> {
let mut g = HOSTS_RUNNING.lock().unwrap(); let mut g = HOSTS_RUNNING.lock().unwrap();
match g.as_ref() { match g.as_ref() {
None => { None => {
let cluster = test_cluster(); let cluster = taskrun::test_cluster();
let jhs = spawn_test_hosts(cluster.clone()); let jhs = spawn_test_hosts(cluster.clone());
let ret = RunningHosts { let ret = RunningHosts {
cluster: cluster.clone(), cluster: cluster.clone(),
@@ -30,31 +30,3 @@ pub fn require_test_hosts_running() -> Result<Arc<RunningHosts>, Error> {
Some(gg) => Ok(gg.clone()), Some(gg) => Ok(gg.clone()),
} }
} }
fn test_cluster() -> Cluster {
let nodes = (0..3)
.into_iter()
.map(|id| Node {
host: "localhost".into(),
listen: "0.0.0.0".into(),
port: 8360 + id as u16,
port_raw: 8360 + id as u16 + 100,
data_base_path: format!("../tmpdata/node{:02}", id).into(),
cache_base_path: format!("../tmpdata/node{:02}", id).into(),
ksprefix: "ks".into(),
split: id,
backend: "testbackend".into(),
bin_grain_kind: 0,
archiver_appliance: None,
})
.collect();
Cluster {
nodes: nodes,
database: Database {
name: "daqbuffer".into(),
host: "localhost".into(),
user: "daqbuffer".into(),
pass: "daqbuffer".into(),
},
}
}
-1
View File
@@ -16,7 +16,6 @@ pub fn make_test_node(id: u32) -> Node {
split: id, split: id,
ksprefix: "ks".into(), ksprefix: "ks".into(),
backend: "testbackend".into(), backend: "testbackend".into(),
bin_grain_kind: 0,
archiver_appliance: None, archiver_appliance: None,
} }
} }
+10 -26
View File
@@ -76,20 +76,12 @@ impl ChannelExecFunction for BinnedBinaryChannelExec {
FrameType + Framable + DeserializeOwned, FrameType + Framable + DeserializeOwned,
{ {
let _ = event_value_shape; let _ = event_value_shape;
let range = BinnedRange::covering_range( let range =
self.query.range().clone(), BinnedRange::covering_range(self.query.range().clone(), self.query.bin_count())?.ok_or(Error::with_msg(
self.query.bin_count(), format!("BinnedBinaryChannelExec BinnedRange::covering_range returned None"),
self.node_config.node.bin_grain_kind, ))?;
)?
.ok_or(Error::with_msg(format!(
"BinnedBinaryChannelExec BinnedRange::covering_range returned None"
)))?;
let perf_opts = PerfOpts { inmem_bufcap: 512 }; let perf_opts = PerfOpts { inmem_bufcap: 512 };
let souter = match PreBinnedPatchRange::covering_range( let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) {
self.query.range().clone(),
self.query.bin_count(),
self.node_config.node.bin_grain_kind,
) {
Ok(Some(pre_range)) => { Ok(Some(pre_range)) => {
info!("BinnedBinaryChannelExec found pre_range: {:?}", pre_range); info!("BinnedBinaryChannelExec found pre_range: {:?}", pre_range);
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
@@ -321,21 +313,13 @@ impl ChannelExecFunction for BinnedJsonChannelExec {
FrameType + Framable + DeserializeOwned, FrameType + Framable + DeserializeOwned,
{ {
let _ = event_value_shape; let _ = event_value_shape;
let range = BinnedRange::covering_range( let range =
self.query.range().clone(), BinnedRange::covering_range(self.query.range().clone(), self.query.bin_count())?.ok_or(Error::with_msg(
self.query.bin_count(), format!("BinnedJsonChannelExec BinnedRange::covering_range returned None"),
self.node_config.node.bin_grain_kind, ))?;
)?
.ok_or(Error::with_msg(format!(
"BinnedJsonChannelExec BinnedRange::covering_range returned None"
)))?;
let t_bin_count = range.count as u32; let t_bin_count = range.count as u32;
let perf_opts = PerfOpts { inmem_bufcap: 512 }; let perf_opts = PerfOpts { inmem_bufcap: 512 };
let souter = match PreBinnedPatchRange::covering_range( let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) {
self.query.range().clone(),
self.query.bin_count(),
self.node_config.node.bin_grain_kind,
) {
Ok(Some(pre_range)) => { Ok(Some(pre_range)) => {
info!("BinnedJsonChannelExec found pre_range: {:?}", pre_range); info!("BinnedJsonChannelExec found pre_range: {:?}", pre_range);
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
+3 -5
View File
@@ -101,14 +101,12 @@ where
self.res = Some(s2); self.res = Some(s2);
continue 'outer; continue 'outer;
} else { } else {
error!( let msg = format!(
"PreBinnedValueFetchedStream got non-OK result from sub request: {:?}", "PreBinnedValueFetchedStream got non-OK result from sub request: {:?}",
res res
); );
let e = Error::with_msg(format!( error!("{}", msg);
"PreBinnedValueFetchedStream got non-OK result from sub request: {:?}", let e = Error::with_msg_no_trace(msg);
res
));
self.errored = true; self.errored = true;
Ready(Some(Err(e))) Ready(Some(Err(e)))
} }
+2 -6
View File
@@ -129,7 +129,7 @@ where
} }
// TODO do I need to set up more transformations or binning to deliver the requested data? // TODO do I need to set up more transformations or binning to deliver the requested data?
let count = self.query.patch().patch_t_len() / self.query.patch().bin_t_len(); let count = self.query.patch().patch_t_len() / self.query.patch().bin_t_len();
let range = BinnedRange::covering_range(evq.range.clone(), count as u32, self.node_config.node.bin_grain_kind)? let range = BinnedRange::covering_range(evq.range.clone(), count as u32)?
.ok_or(Error::with_msg("covering_range returns None"))?; .ok_or(Error::with_msg("covering_range returns None"))?;
let perf_opts = PerfOpts { inmem_bufcap: 512 }; let perf_opts = PerfOpts { inmem_bufcap: 512 };
let s = MergedFromRemotes::<ENP>::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); let s = MergedFromRemotes::<ENP>::new(evq, perf_opts, self.node_config.node_config.cluster.clone());
@@ -209,11 +209,7 @@ where
fn try_setup_fetch_prebinned_higher_res(&mut self) -> Result<(), Error> { fn try_setup_fetch_prebinned_higher_res(&mut self) -> Result<(), Error> {
let range = self.query.patch().patch_range(); let range = self.query.patch().patch_range();
match PreBinnedPatchRange::covering_range( match PreBinnedPatchRange::covering_range(range, self.query.patch().bin_count() + 1) {
range,
self.query.patch().bin_count() + 1,
self.node_config.node.bin_grain_kind,
) {
Ok(Some(range)) => { Ok(Some(range)) => {
self.fut2 = Some(self.setup_from_higher_res_prebinned(range)?); self.fut2 = Some(self.setup_from_higher_res_prebinned(range)?);
} }
+58
View File
@@ -3,6 +3,7 @@ use bytes::BytesMut;
use err::Error; use err::Error;
use futures_util::StreamExt; use futures_util::StreamExt;
use netpod::log::*; use netpod::log::*;
use netpod::timeunits::DAY;
use netpod::{ChannelConfig, NanoRange, Nanos, Node}; use netpod::{ChannelConfig, NanoRange, Nanos, Node};
use std::path::PathBuf; use std::path::PathBuf;
use std::time::Instant; use std::time::Instant;
@@ -162,3 +163,60 @@ async fn open_files_inner(
// TODO keep track of number of running // TODO keep track of number of running
Ok(()) Ok(())
} }
/**
Try to find and position the file with the youngest event before the requested range.
*/
async fn single_file_before_range(
chtx: async_channel::Sender<Result<OpenedFile, Error>>,
range: NanoRange,
channel_config: ChannelConfig,
node: Node,
) -> Result<(), Error> {
Ok(())
}
#[test]
fn single_file() {
let range = netpod::NanoRange { beg: 0, end: 0 };
let chn = netpod::Channel {
backend: "testbackend".into(),
name: "scalar-i32-be".into(),
};
let cfg = ChannelConfig {
channel: chn,
keyspace: 2,
time_bin_size: Nanos { ns: DAY },
scalar_type: netpod::ScalarType::I32,
compression: false,
shape: netpod::Shape::Scalar,
array: false,
byte_order: netpod::ByteOrder::big_endian(),
};
let cluster = taskrun::test_cluster();
let task = async move {
let (tx, rx) = async_channel::bounded(2);
let jh = taskrun::spawn(single_file_before_range(tx, range, cfg, cluster.nodes[0].clone()));
loop {
match rx.recv().await {
Ok(k) => match k {
Ok(k) => {
info!("opened file: {:?}", k.path);
}
Err(e) => {
error!("error while trying to open {:?}", e);
break;
}
},
Err(e) => {
// channel closed.
info!("channel closed");
break;
}
}
}
jh.await??;
Ok(())
};
taskrun::run(task).unwrap();
}
-1
View File
@@ -81,7 +81,6 @@ pub async fn gen_test_data() -> Result<(), Error> {
cache_base_path: data_base_path.join(format!("node{:02}", i1)), cache_base_path: data_base_path.join(format!("node{:02}", i1)),
ksprefix: ksprefix.clone(), ksprefix: ksprefix.clone(),
backend: "testbackend".into(), backend: "testbackend".into(),
bin_grain_kind: 0,
archiver_appliance: None, archiver_appliance: None,
}; };
ensemble.nodes.push(node); ensemble.nodes.push(node);
+1 -1
View File
@@ -308,7 +308,7 @@ pub struct FileChunkRead {
} }
pub fn file_content_stream( pub fn file_content_stream(
mut file: tokio::fs::File, mut file: File,
buffer_size: usize, buffer_size: usize,
) -> impl Stream<Item = Result<FileChunkRead, Error>> + Send { ) -> impl Stream<Item = Result<FileChunkRead, Error>> + Send {
async_stream::stream! { async_stream::stream! {
+4 -3
View File
@@ -12,6 +12,7 @@ use hyper::service::{make_service_fn, service_fn};
use hyper::{server::Server, Body, Request, Response}; use hyper::{server::Server, Body, Request, Response};
use net::SocketAddr; use net::SocketAddr;
use netpod::log::*; use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{ use netpod::{
channel_from_pairs, get_url_query_pairs, AggKind, ChannelConfigQuery, FromUrl, NodeConfigCached, APP_JSON, channel_from_pairs, get_url_query_pairs, AggKind, ChannelConfigQuery, FromUrl, NodeConfigCached, APP_JSON,
APP_JSON_LINES, APP_OCTET, APP_JSON_LINES, APP_OCTET,
@@ -398,7 +399,7 @@ async fn binned_json(query: BinnedQuery, node_config: &NodeConfigCached) -> Resu
async fn prebinned(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> { async fn prebinned(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
let (head, _body) = req.into_parts(); let (head, _body) = req.into_parts();
let query = PreBinnedQuery::from_request(&head)?; let query = PreBinnedQuery::from_request(&head)?;
let desc = format!("pre-b-{}", query.patch().bin_t_len() / 1000000000); let desc = format!("pre-b-{}", query.patch().bin_t_len() / SEC);
let span1 = span!(Level::INFO, "httpret::prebinned_DISABLED", desc = &desc.as_str()); let span1 = span!(Level::INFO, "httpret::prebinned_DISABLED", desc = &desc.as_str());
//span1.in_scope(|| {}); //span1.in_scope(|| {});
let fut = pre_binned_bytes_for_http(node_config, &query).instrument(span1); let fut = pre_binned_bytes_for_http(node_config, &query).instrument(span1);
@@ -407,8 +408,8 @@ async fn prebinned(req: Request<Body>, node_config: &NodeConfigCached) -> Result
s, s,
format!( format!(
"pre-b-{}-p-{}", "pre-b-{}-p-{}",
query.patch().bin_t_len() / 1000000000, query.patch().bin_t_len() / SEC,
query.patch().patch_beg() / 1000000000, query.patch().patch_beg() / SEC,
), ),
))?, ))?,
Err(e) => { Err(e) => {
+12 -2
View File
@@ -68,6 +68,8 @@ async fn proxy_http_service_try(req: Request<Body>, proxy_config: &ProxyConfig)
Err(Error::with_msg("todo")) Err(Error::with_msg("todo"))
} else if path == "/api/1/stats/" { } else if path == "/api/1/stats/" {
Err(Error::with_msg("todo")) Err(Error::with_msg("todo"))
} else if path == "/api/1/query" {
Ok(proxy_api1_single_backend_query(req, proxy_config).await?)
} else if path.starts_with("/api/1/gather/") { } else if path.starts_with("/api/1/gather/") {
Ok(gather_json_2_v1(req, "/api/1/gather/", proxy_config).await?) Ok(gather_json_2_v1(req, "/api/1/gather/", proxy_config).await?)
} else if path == "/api/4/backends" { } else if path == "/api/4/backends" {
@@ -225,12 +227,13 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
backend: String, backend: String,
#[serde(rename = "type")] #[serde(rename = "type")]
ty: String, ty: String,
}; }
#[derive(Deserialize)] #[derive(Deserialize)]
struct ResContApi0 { struct ResContApi0 {
#[allow(dead_code)]
backend: String, backend: String,
channels: Vec<ResItemApi0>, channels: Vec<ResItemApi0>,
}; }
match serde_json::from_slice::<Vec<ResContApi0>>(&body) { match serde_json::from_slice::<Vec<ResContApi0>>(&body) {
Ok(k) => { Ok(k) => {
let mut a = vec![]; let mut a = vec![];
@@ -295,6 +298,13 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
} }
} }
pub async fn proxy_api1_single_backend_query(
req: Request<Body>,
proxy_config: &ProxyConfig,
) -> Result<Response<Body>, Error> {
panic!()
}
pub async fn proxy_single_backend_query<QT>( pub async fn proxy_single_backend_query<QT>(
req: Request<Body>, req: Request<Body>,
proxy_config: &ProxyConfig, proxy_config: &ProxyConfig,
-1
View File
@@ -12,7 +12,6 @@ fn ca_connect_1() {
let node_config = NodeConfigCached { let node_config = NodeConfigCached {
node: Node { node: Node {
host: "".into(), host: "".into(),
bin_grain_kind: 0,
port: 123, port: 123,
port_raw: 123, port_raw: 123,
backend: "".into(), backend: "".into(),
+22
View File
@@ -0,0 +1,22 @@
use serde::Deserialize;
#[derive(Debug, Deserialize)]
pub struct Range {
#[serde(rename = "type")]
ty: String,
#[serde(rename = "startDate")]
beg: String,
#[serde(rename = "endDate")]
end: String,
}
// TODO implement Deserialize such that I recognize the different possible formats...
// I guess, when serializing, it's ok to use the fully qualified format throughout.
#[derive(Debug, Deserialize)]
pub struct ChannelList {}
#[derive(Debug, Deserialize)]
pub struct Query {
range: Range,
channels: ChannelList,
}
+25 -15
View File
@@ -1,3 +1,8 @@
pub mod api1;
pub mod query;
pub mod status;
pub mod streamext;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::fmt::{self, Debug, Display, Formatter}; use std::fmt::{self, Debug, Display, Formatter};
use std::iter::FromIterator; use std::iter::FromIterator;
@@ -18,10 +23,6 @@ use url::Url;
use err::Error; use err::Error;
use timeunits::*; use timeunits::*;
pub mod query;
pub mod status;
pub mod streamext;
pub const APP_JSON: &'static str = "application/json"; pub const APP_JSON: &'static str = "application/json";
pub const APP_JSON_LINES: &'static str = "application/jsonlines"; pub const APP_JSON_LINES: &'static str = "application/jsonlines";
pub const APP_OCTET: &'static str = "application/octet-stream"; pub const APP_OCTET: &'static str = "application/octet-stream";
@@ -132,11 +133,26 @@ pub struct Node {
pub cache_base_path: PathBuf, pub cache_base_path: PathBuf,
pub ksprefix: String, pub ksprefix: String,
pub backend: String, pub backend: String,
#[serde(default)]
pub bin_grain_kind: u32,
pub archiver_appliance: Option<ArchiverAppliance>, pub archiver_appliance: Option<ArchiverAppliance>,
} }
impl Node {
pub fn dummy() -> Self {
Self {
host: "dummy".into(),
listen: "dummy".into(),
port: 4444,
port_raw: 4444,
split: 0,
data_base_path: PathBuf::new(),
cache_base_path: PathBuf::new(),
ksprefix: "daqlocal".into(),
backend: "dummybackend".into(),
archiver_appliance: None,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Database { pub struct Database {
pub name: String, pub name: String,
@@ -351,8 +367,6 @@ pub mod timeunits {
const BIN_T_LEN_OPTIONS_0: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY]; const BIN_T_LEN_OPTIONS_0: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY];
const BIN_T_LEN_OPTIONS_1: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY];
const PATCH_T_LEN_KEY: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY]; const PATCH_T_LEN_KEY: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY];
const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 4] = [MIN * 60, HOUR * 4, DAY * 4, DAY * 32]; const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 4] = [MIN * 60, HOUR * 4, DAY * 4, DAY * 32];
@@ -469,12 +483,8 @@ fn get_patch_t_len(bin_t_len: u64) -> u64 {
impl PreBinnedPatchRange { impl PreBinnedPatchRange {
/// Cover at least the given range with at least as many as the requested number of bins. /// Cover at least the given range with at least as many as the requested number of bins.
pub fn covering_range(range: NanoRange, min_bin_count: u32, bin_grain_kind: u32) -> Result<Option<Self>, Error> { pub fn covering_range(range: NanoRange, min_bin_count: u32) -> Result<Option<Self>, Error> {
let bin_t_len_options = if bin_grain_kind == 1 { let bin_t_len_options = &BIN_T_LEN_OPTIONS_0;
&BIN_T_LEN_OPTIONS_1
} else {
&BIN_T_LEN_OPTIONS_0
};
if min_bin_count < 1 { if min_bin_count < 1 {
Err(Error::with_msg("min_bin_count < 1"))?; Err(Error::with_msg("min_bin_count < 1"))?;
} }
@@ -646,7 +656,7 @@ pub struct BinnedRange {
} }
impl BinnedRange { impl BinnedRange {
pub fn covering_range(range: NanoRange, min_bin_count: u32, _bin_grain_kind: u32) -> Result<Option<Self>, Error> { pub fn covering_range(range: NanoRange, min_bin_count: u32) -> Result<Option<Self>, Error> {
let thresholds = &BIN_THRESHOLDS; let thresholds = &BIN_THRESHOLDS;
if min_bin_count < 1 { if min_bin_count < 1 {
Err(Error::with_msg("min_bin_count < 1"))?; Err(Error::with_msg("min_bin_count < 1"))?;
+1
View File
@@ -11,3 +11,4 @@ tracing-subscriber = "0.2.17"
backtrace = "0.3.56" backtrace = "0.3.56"
lazy_static = "1.4.0" lazy_static = "1.4.0"
err = { path = "../err" } err = { path = "../err" }
netpod = { path = "../netpod" }
+29 -5
View File
@@ -1,13 +1,10 @@
use crate::log::*;
use err::Error;
use std::future::Future; use std::future::Future;
use std::panic; use std::panic;
use std::sync::Mutex; use std::sync::Mutex;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use err::Error;
use crate::log::*;
pub mod log { pub mod log {
#[allow(unused_imports)] #[allow(unused_imports)]
pub use tracing::{debug, error, info, trace, warn}; pub use tracing::{debug, error, info, trace, warn};
@@ -84,3 +81,30 @@ where
{ {
tokio::spawn(task) tokio::spawn(task)
} }
pub fn test_cluster() -> netpod::Cluster {
let nodes = (0..3)
.into_iter()
.map(|id| netpod::Node {
host: "localhost".into(),
listen: "0.0.0.0".into(),
port: 8360 + id as u16,
port_raw: 8360 + id as u16 + 100,
data_base_path: format!("../tmpdata/node{:02}", id).into(),
cache_base_path: format!("../tmpdata/node{:02}", id).into(),
ksprefix: "ks".into(),
split: id,
backend: "testbackend".into(),
archiver_appliance: None,
})
.collect();
netpod::Cluster {
nodes: nodes,
database: netpod::Database {
name: "daqbuffer".into(),
host: "localhost".into(),
user: "daqbuffer".into(),
pass: "daqbuffer".into(),
},
}
}