From 6581946eaf2fa66e448720d1fe14b1fc80e9f7b5 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 31 Aug 2021 16:12:13 +0200 Subject: [PATCH] WIP on file-lookup for expand-query --- daqbuffer/src/bin/daqbuffer.rs | 1 + daqbuffer/src/cli.rs | 1 + daqbufp2/src/nodes.rs | 30 +---------------- disk/src/aggtest.rs | 1 - disk/src/binned.rs | 36 ++++++-------------- disk/src/binned/binnedfrompbv.rs | 8 ++--- disk/src/binned/pbv.rs | 8 ++--- disk/src/dataopen.rs | 58 ++++++++++++++++++++++++++++++++ disk/src/gen.rs | 1 - disk/src/lib.rs | 2 +- httpret/src/lib.rs | 7 ++-- httpret/src/proxy.rs | 14 ++++++-- netfetch/src/test.rs | 1 - netpod/src/api1.rs | 22 ++++++++++++ netpod/src/lib.rs | 40 +++++++++++++--------- taskrun/Cargo.toml | 1 + taskrun/src/lib.rs | 34 ++++++++++++++++--- 17 files changed, 170 insertions(+), 95 deletions(-) create mode 100644 netpod/src/api1.rs diff --git a/daqbuffer/src/bin/daqbuffer.rs b/daqbuffer/src/bin/daqbuffer.rs index 9ecc623..5cf673b 100644 --- a/daqbuffer/src/bin/daqbuffer.rs +++ b/daqbuffer/src/bin/daqbuffer.rs @@ -97,6 +97,7 @@ async fn go() -> Result<(), Error> { SubCmd::Zmtp(zmtp) => { netfetch::zmtp::zmtp_client(&zmtp.addr).await?; } + SubCmd::Test => (), } Ok(()) } diff --git a/daqbuffer/src/cli.rs b/daqbuffer/src/cli.rs index dd1699f..31e5573 100644 --- a/daqbuffer/src/cli.rs +++ b/daqbuffer/src/cli.rs @@ -16,6 +16,7 @@ pub enum SubCmd { Client(Client), GenerateTestData, Zmtp(Zmtp), + Test, } #[derive(Debug, Clap)] diff --git a/daqbufp2/src/nodes.rs b/daqbufp2/src/nodes.rs index 4ac29aa..b011cd6 100644 --- a/daqbufp2/src/nodes.rs +++ b/daqbufp2/src/nodes.rs @@ -17,7 +17,7 @@ pub fn require_test_hosts_running() -> Result, Error> { let mut g = HOSTS_RUNNING.lock().unwrap(); match g.as_ref() { None => { - let cluster = test_cluster(); + let cluster = taskrun::test_cluster(); let jhs = spawn_test_hosts(cluster.clone()); let ret = RunningHosts { cluster: cluster.clone(), @@ -30,31 +30,3 @@ pub fn require_test_hosts_running() -> Result, Error> { Some(gg) => Ok(gg.clone()), } } - -fn test_cluster() -> Cluster { - let nodes = (0..3) - .into_iter() - .map(|id| Node { - host: "localhost".into(), - listen: "0.0.0.0".into(), - port: 8360 + id as u16, - port_raw: 8360 + id as u16 + 100, - data_base_path: format!("../tmpdata/node{:02}", id).into(), - cache_base_path: format!("../tmpdata/node{:02}", id).into(), - ksprefix: "ks".into(), - split: id, - backend: "testbackend".into(), - bin_grain_kind: 0, - archiver_appliance: None, - }) - .collect(); - Cluster { - nodes: nodes, - database: Database { - name: "daqbuffer".into(), - host: "localhost".into(), - user: "daqbuffer".into(), - pass: "daqbuffer".into(), - }, - } -} diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 46645e3..5ea710d 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -16,7 +16,6 @@ pub fn make_test_node(id: u32) -> Node { split: id, ksprefix: "ks".into(), backend: "testbackend".into(), - bin_grain_kind: 0, archiver_appliance: None, } } diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 8bef4b3..1ce34f5 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -76,20 +76,12 @@ impl ChannelExecFunction for BinnedBinaryChannelExec { FrameType + Framable + DeserializeOwned, { let _ = event_value_shape; - let range = BinnedRange::covering_range( - self.query.range().clone(), - self.query.bin_count(), - self.node_config.node.bin_grain_kind, - )? - .ok_or(Error::with_msg(format!( - "BinnedBinaryChannelExec BinnedRange::covering_range returned None" - )))?; + let range = + BinnedRange::covering_range(self.query.range().clone(), self.query.bin_count())?.ok_or(Error::with_msg( + format!("BinnedBinaryChannelExec BinnedRange::covering_range returned None"), + ))?; let perf_opts = PerfOpts { inmem_bufcap: 512 }; - let souter = match PreBinnedPatchRange::covering_range( - self.query.range().clone(), - self.query.bin_count(), - self.node_config.node.bin_grain_kind, - ) { + let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) { Ok(Some(pre_range)) => { info!("BinnedBinaryChannelExec found pre_range: {:?}", pre_range); if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { @@ -321,21 +313,13 @@ impl ChannelExecFunction for BinnedJsonChannelExec { FrameType + Framable + DeserializeOwned, { let _ = event_value_shape; - let range = BinnedRange::covering_range( - self.query.range().clone(), - self.query.bin_count(), - self.node_config.node.bin_grain_kind, - )? - .ok_or(Error::with_msg(format!( - "BinnedJsonChannelExec BinnedRange::covering_range returned None" - )))?; + let range = + BinnedRange::covering_range(self.query.range().clone(), self.query.bin_count())?.ok_or(Error::with_msg( + format!("BinnedJsonChannelExec BinnedRange::covering_range returned None"), + ))?; let t_bin_count = range.count as u32; let perf_opts = PerfOpts { inmem_bufcap: 512 }; - let souter = match PreBinnedPatchRange::covering_range( - self.query.range().clone(), - self.query.bin_count(), - self.node_config.node.bin_grain_kind, - ) { + let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) { Ok(Some(pre_range)) => { info!("BinnedJsonChannelExec found pre_range: {:?}", pre_range); if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { diff --git a/disk/src/binned/binnedfrompbv.rs b/disk/src/binned/binnedfrompbv.rs index 96ce74c..fe328db 100644 --- a/disk/src/binned/binnedfrompbv.rs +++ b/disk/src/binned/binnedfrompbv.rs @@ -101,14 +101,12 @@ where self.res = Some(s2); continue 'outer; } else { - error!( + let msg = format!( "PreBinnedValueFetchedStream got non-OK result from sub request: {:?}", res ); - let e = Error::with_msg(format!( - "PreBinnedValueFetchedStream got non-OK result from sub request: {:?}", - res - )); + error!("{}", msg); + let e = Error::with_msg_no_trace(msg); self.errored = true; Ready(Some(Err(e))) } diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index cd3477c..1924605 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -129,7 +129,7 @@ where } // TODO do I need to set up more transformations or binning to deliver the requested data? let count = self.query.patch().patch_t_len() / self.query.patch().bin_t_len(); - let range = BinnedRange::covering_range(evq.range.clone(), count as u32, self.node_config.node.bin_grain_kind)? + let range = BinnedRange::covering_range(evq.range.clone(), count as u32)? .ok_or(Error::with_msg("covering_range returns None"))?; let perf_opts = PerfOpts { inmem_bufcap: 512 }; let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); @@ -209,11 +209,7 @@ where fn try_setup_fetch_prebinned_higher_res(&mut self) -> Result<(), Error> { let range = self.query.patch().patch_range(); - match PreBinnedPatchRange::covering_range( - range, - self.query.patch().bin_count() + 1, - self.node_config.node.bin_grain_kind, - ) { + match PreBinnedPatchRange::covering_range(range, self.query.patch().bin_count() + 1) { Ok(Some(range)) => { self.fut2 = Some(self.setup_from_higher_res_prebinned(range)?); } diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index d00a308..d93d0d3 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -3,6 +3,7 @@ use bytes::BytesMut; use err::Error; use futures_util::StreamExt; use netpod::log::*; +use netpod::timeunits::DAY; use netpod::{ChannelConfig, NanoRange, Nanos, Node}; use std::path::PathBuf; use std::time::Instant; @@ -162,3 +163,60 @@ async fn open_files_inner( // TODO keep track of number of running Ok(()) } + +/** +Try to find and position the file with the youngest event before the requested range. +*/ +async fn single_file_before_range( + chtx: async_channel::Sender>, + range: NanoRange, + channel_config: ChannelConfig, + node: Node, +) -> Result<(), Error> { + Ok(()) +} + +#[test] +fn single_file() { + let range = netpod::NanoRange { beg: 0, end: 0 }; + let chn = netpod::Channel { + backend: "testbackend".into(), + name: "scalar-i32-be".into(), + }; + let cfg = ChannelConfig { + channel: chn, + keyspace: 2, + time_bin_size: Nanos { ns: DAY }, + scalar_type: netpod::ScalarType::I32, + compression: false, + shape: netpod::Shape::Scalar, + array: false, + byte_order: netpod::ByteOrder::big_endian(), + }; + let cluster = taskrun::test_cluster(); + let task = async move { + let (tx, rx) = async_channel::bounded(2); + let jh = taskrun::spawn(single_file_before_range(tx, range, cfg, cluster.nodes[0].clone())); + loop { + match rx.recv().await { + Ok(k) => match k { + Ok(k) => { + info!("opened file: {:?}", k.path); + } + Err(e) => { + error!("error while trying to open {:?}", e); + break; + } + }, + Err(e) => { + // channel closed. + info!("channel closed"); + break; + } + } + } + jh.await??; + Ok(()) + }; + taskrun::run(task).unwrap(); +} diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 311762e..e4ab172 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -81,7 +81,6 @@ pub async fn gen_test_data() -> Result<(), Error> { cache_base_path: data_base_path.join(format!("node{:02}", i1)), ksprefix: ksprefix.clone(), backend: "testbackend".into(), - bin_grain_kind: 0, archiver_appliance: None, }; ensemble.nodes.push(node); diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 4b9b762..be85b37 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -308,7 +308,7 @@ pub struct FileChunkRead { } pub fn file_content_stream( - mut file: tokio::fs::File, + mut file: File, buffer_size: usize, ) -> impl Stream> + Send { async_stream::stream! { diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 2288c95..6a126fa 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -12,6 +12,7 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{server::Server, Body, Request, Response}; use net::SocketAddr; use netpod::log::*; +use netpod::timeunits::SEC; use netpod::{ channel_from_pairs, get_url_query_pairs, AggKind, ChannelConfigQuery, FromUrl, NodeConfigCached, APP_JSON, APP_JSON_LINES, APP_OCTET, @@ -398,7 +399,7 @@ async fn binned_json(query: BinnedQuery, node_config: &NodeConfigCached) -> Resu async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let (head, _body) = req.into_parts(); let query = PreBinnedQuery::from_request(&head)?; - let desc = format!("pre-b-{}", query.patch().bin_t_len() / 1000000000); + let desc = format!("pre-b-{}", query.patch().bin_t_len() / SEC); let span1 = span!(Level::INFO, "httpret::prebinned_DISABLED", desc = &desc.as_str()); //span1.in_scope(|| {}); let fut = pre_binned_bytes_for_http(node_config, &query).instrument(span1); @@ -407,8 +408,8 @@ async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result s, format!( "pre-b-{}-p-{}", - query.patch().bin_t_len() / 1000000000, - query.patch().patch_beg() / 1000000000, + query.patch().bin_t_len() / SEC, + query.patch().patch_beg() / SEC, ), ))?, Err(e) => { diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index b6c862d..d59a0b2 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -68,6 +68,8 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) Err(Error::with_msg("todo")) } else if path == "/api/1/stats/" { Err(Error::with_msg("todo")) + } else if path == "/api/1/query" { + Ok(proxy_api1_single_backend_query(req, proxy_config).await?) } else if path.starts_with("/api/1/gather/") { Ok(gather_json_2_v1(req, "/api/1/gather/", proxy_config).await?) } else if path == "/api/4/backends" { @@ -225,12 +227,13 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R backend: String, #[serde(rename = "type")] ty: String, - }; + } #[derive(Deserialize)] struct ResContApi0 { + #[allow(dead_code)] backend: String, channels: Vec, - }; + } match serde_json::from_slice::>(&body) { Ok(k) => { let mut a = vec![]; @@ -295,6 +298,13 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R } } +pub async fn proxy_api1_single_backend_query( + req: Request, + proxy_config: &ProxyConfig, +) -> Result, Error> { + panic!() +} + pub async fn proxy_single_backend_query( req: Request, proxy_config: &ProxyConfig, diff --git a/netfetch/src/test.rs b/netfetch/src/test.rs index bde3c7b..fc75048 100644 --- a/netfetch/src/test.rs +++ b/netfetch/src/test.rs @@ -12,7 +12,6 @@ fn ca_connect_1() { let node_config = NodeConfigCached { node: Node { host: "".into(), - bin_grain_kind: 0, port: 123, port_raw: 123, backend: "".into(), diff --git a/netpod/src/api1.rs b/netpod/src/api1.rs new file mode 100644 index 0000000..931064f --- /dev/null +++ b/netpod/src/api1.rs @@ -0,0 +1,22 @@ +use serde::Deserialize; + +#[derive(Debug, Deserialize)] +pub struct Range { + #[serde(rename = "type")] + ty: String, + #[serde(rename = "startDate")] + beg: String, + #[serde(rename = "endDate")] + end: String, +} + +// TODO implement Deserialize such that I recognize the different possible formats... +// I guess, when serializing, it's ok to use the fully qualified format throughout. +#[derive(Debug, Deserialize)] +pub struct ChannelList {} + +#[derive(Debug, Deserialize)] +pub struct Query { + range: Range, + channels: ChannelList, +} diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 253658c..4a6a18a 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -1,3 +1,8 @@ +pub mod api1; +pub mod query; +pub mod status; +pub mod streamext; + use std::collections::BTreeMap; use std::fmt::{self, Debug, Display, Formatter}; use std::iter::FromIterator; @@ -18,10 +23,6 @@ use url::Url; use err::Error; use timeunits::*; -pub mod query; -pub mod status; -pub mod streamext; - pub const APP_JSON: &'static str = "application/json"; pub const APP_JSON_LINES: &'static str = "application/jsonlines"; pub const APP_OCTET: &'static str = "application/octet-stream"; @@ -132,11 +133,26 @@ pub struct Node { pub cache_base_path: PathBuf, pub ksprefix: String, pub backend: String, - #[serde(default)] - pub bin_grain_kind: u32, pub archiver_appliance: Option, } +impl Node { + pub fn dummy() -> Self { + Self { + host: "dummy".into(), + listen: "dummy".into(), + port: 4444, + port_raw: 4444, + split: 0, + data_base_path: PathBuf::new(), + cache_base_path: PathBuf::new(), + ksprefix: "daqlocal".into(), + backend: "dummybackend".into(), + archiver_appliance: None, + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Database { pub name: String, @@ -351,8 +367,6 @@ pub mod timeunits { const BIN_T_LEN_OPTIONS_0: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY]; -const BIN_T_LEN_OPTIONS_1: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY]; - const PATCH_T_LEN_KEY: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY]; const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 4] = [MIN * 60, HOUR * 4, DAY * 4, DAY * 32]; @@ -469,12 +483,8 @@ fn get_patch_t_len(bin_t_len: u64) -> u64 { impl PreBinnedPatchRange { /// Cover at least the given range with at least as many as the requested number of bins. - pub fn covering_range(range: NanoRange, min_bin_count: u32, bin_grain_kind: u32) -> Result, Error> { - let bin_t_len_options = if bin_grain_kind == 1 { - &BIN_T_LEN_OPTIONS_1 - } else { - &BIN_T_LEN_OPTIONS_0 - }; + pub fn covering_range(range: NanoRange, min_bin_count: u32) -> Result, Error> { + let bin_t_len_options = &BIN_T_LEN_OPTIONS_0; if min_bin_count < 1 { Err(Error::with_msg("min_bin_count < 1"))?; } @@ -646,7 +656,7 @@ pub struct BinnedRange { } impl BinnedRange { - pub fn covering_range(range: NanoRange, min_bin_count: u32, _bin_grain_kind: u32) -> Result, Error> { + pub fn covering_range(range: NanoRange, min_bin_count: u32) -> Result, Error> { let thresholds = &BIN_THRESHOLDS; if min_bin_count < 1 { Err(Error::with_msg("min_bin_count < 1"))?; diff --git a/taskrun/Cargo.toml b/taskrun/Cargo.toml index 7456bab..f9f52cf 100644 --- a/taskrun/Cargo.toml +++ b/taskrun/Cargo.toml @@ -11,3 +11,4 @@ tracing-subscriber = "0.2.17" backtrace = "0.3.56" lazy_static = "1.4.0" err = { path = "../err" } +netpod = { path = "../netpod" } diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index 9818f35..1424144 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -1,13 +1,10 @@ +use crate::log::*; +use err::Error; use std::future::Future; use std::panic; use std::sync::Mutex; - use tokio::task::JoinHandle; -use err::Error; - -use crate::log::*; - pub mod log { #[allow(unused_imports)] pub use tracing::{debug, error, info, trace, warn}; @@ -84,3 +81,30 @@ where { tokio::spawn(task) } + +pub fn test_cluster() -> netpod::Cluster { + let nodes = (0..3) + .into_iter() + .map(|id| netpod::Node { + host: "localhost".into(), + listen: "0.0.0.0".into(), + port: 8360 + id as u16, + port_raw: 8360 + id as u16 + 100, + data_base_path: format!("../tmpdata/node{:02}", id).into(), + cache_base_path: format!("../tmpdata/node{:02}", id).into(), + ksprefix: "ks".into(), + split: id, + backend: "testbackend".into(), + archiver_appliance: None, + }) + .collect(); + netpod::Cluster { + nodes: nodes, + database: netpod::Database { + name: "daqbuffer".into(), + host: "localhost".into(), + user: "daqbuffer".into(), + pass: "daqbuffer".into(), + }, + } +}