From 0b40702b6c1815522742e61bcd21654f38980cfe Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 27 Apr 2021 11:41:33 +0200 Subject: [PATCH] WIP --- dbconn/Cargo.toml | 25 +++++++++++++++ dbconn/src/lib.rs | 26 ++++++++++++++++ disk/Cargo.toml | 1 + disk/src/cache.rs | 30 +++--------------- disk/src/channelconfig.rs | 28 ++++++++++++++++- disk/src/gen.rs | 4 +-- disk/src/raw/conn.rs | 56 +++++++++++++++++++++++++++------- err/Cargo.toml | 1 + err/src/lib.rs | 6 ++++ netpod/src/lib.rs | 9 ++++++ retrieval/src/bin/retrieval.rs | 12 ++++++-- retrieval/src/test.rs | 18 ++++++++--- 12 files changed, 170 insertions(+), 46 deletions(-) create mode 100644 dbconn/Cargo.toml create mode 100644 dbconn/src/lib.rs diff --git a/dbconn/Cargo.toml b/dbconn/Cargo.toml new file mode 100644 index 0000000..f03bf42 --- /dev/null +++ b/dbconn/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "dbconn" +version = "0.0.1-a.0" +authors = ["Dominik Werder "] +edition = "2018" + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tokio = { version = "1.4.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tracing = "0.1.25" +crc32fast = "1.2.1" +arrayref = "0.3.6" +byteorder = "1.4.3" +futures-core = "0.3.14" +futures-util = "0.3.14" +bytes = "1.0.1" +bincode = "1.3.3" +#async-channel = "1" +#dashmap = "3" +tokio-postgres = { version = "0.7", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } +chrono = "0.4" +err = { path = "../err" } +netpod = { path = "../netpod" } +taskrun = { path = "../taskrun" } diff --git a/dbconn/src/lib.rs b/dbconn/src/lib.rs new file mode 100644 index 0000000..1df4537 --- /dev/null +++ b/dbconn/src/lib.rs @@ -0,0 +1,26 @@ +use err::Error; +use netpod::log::*; +use netpod::{Channel, NodeConfig}; +use std::sync::Arc; +use tokio_postgres::NoTls; + +pub async fn channel_exists(channel: &Channel, node_config: Arc) -> Result { + let d = &node_config.cluster.database; + let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name); + let (cl, conn) = tokio_postgres::connect(&uri, NoTls).await?; + let cjh = tokio::spawn(async move { + if let Err(e) = conn.await { + error!("connection error: {}", e); + } + Ok::<_, Error>(()) + }); + let rows = cl + .query("select rowid from channels where name = $1::text", &[&channel.name]) + .await?; + info!("channel_exists {} rows", rows.len()); + for row in rows { + info!(" db on channel search: {:?}", row); + } + drop(cjh); + Ok(true) +} diff --git a/disk/Cargo.toml b/disk/Cargo.toml index bfe2405..14967aa 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -33,3 +33,4 @@ err = { path = "../err" } taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } bitshuffle = { path = "../bitshuffle" } +dbconn = { path = "../dbconn" } diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 397195b..a3256ea 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,6 +1,7 @@ use crate::agg::MinMaxAvgScalarEventBatch; use crate::binnedstream::BinnedStream; use crate::cache::pbv::PreBinnedValueByteStream; +use crate::channelconfig::{extract_matching_config_entry, read_local_config}; use crate::frame::makeframe::make_frame; use crate::merge::MergedMinMaxAvgScalarStream; use crate::raw::EventsQuery; @@ -68,33 +69,10 @@ pub async fn binned_bytes_for_http( node_config: Arc, query: &Query, ) -> Result { - let channel_config = super::channelconfig::read_local_config(&query.channel, node_config.clone()).await?; - let entry; - { - let mut ixs = vec![]; - for i1 in 0..channel_config.entries.len() { - let e1 = &channel_config.entries[i1]; - if i1 + 1 < channel_config.entries.len() { - let e2 = &channel_config.entries[i1 + 1]; - if e1.ts < query.range.end && e2.ts >= query.range.beg { - ixs.push(i1); - } - } else { - if e1.ts < query.range.end { - ixs.push(i1); - } - } - } - if ixs.len() == 0 { - return Err(Error::with_msg(format!("no config entries found"))); - } else if ixs.len() > 1 { - return Err(Error::with_msg(format!("too many config entries found: {}", ixs.len()))); - } - entry = &channel_config.entries[ixs[0]]; - } - + let range = &query.range; + let channel_config = read_local_config(&query.channel, node_config.clone()).await?; + let entry = extract_matching_config_entry(range, &channel_config); info!("found config entry {:?}", entry); - let grid = PreBinnedPatchRange::covering_range(query.range.clone(), query.count); match grid { Some(spec) => { diff --git a/disk/src/channelconfig.rs b/disk/src/channelconfig.rs index bf3bfc0..b144e37 100644 --- a/disk/src/channelconfig.rs +++ b/disk/src/channelconfig.rs @@ -1,5 +1,5 @@ use err::Error; -use netpod::{Channel, NodeConfig}; +use netpod::{Channel, NanoRange, NodeConfig}; use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8}; use nom::Needed; #[allow(unused_imports)] @@ -278,6 +278,32 @@ pub async fn read_local_config(channel: &Channel, node_config: Arc) Ok(config.1) } +pub fn extract_matching_config_entry<'a>( + range: &NanoRange, + channel_config: &'a Config, +) -> Result<&'a ConfigEntry, Error> { + let mut ixs = vec![]; + for i1 in 0..channel_config.entries.len() { + let e1 = &channel_config.entries[i1]; + if i1 + 1 < channel_config.entries.len() { + let e2 = &channel_config.entries[i1 + 1]; + if e1.ts < range.end && e2.ts >= range.beg { + ixs.push(i1); + } + } else { + if e1.ts < range.end { + ixs.push(i1); + } + } + } + if ixs.len() == 0 { + return Err(Error::with_msg(format!("no config entries found"))); + } else if ixs.len() > 1 { + return Err(Error::with_msg(format!("too many config entries found: {}", ixs.len()))); + } + Ok(&channel_config.entries[ixs[0]]) +} + #[cfg(test)] mod test { use super::parse_config; diff --git a/disk/src/gen.rs b/disk/src/gen.rs index e6d31ba..3af9bd3 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -32,11 +32,11 @@ pub async fn gen_test_data() -> Result<(), Error> { big_endian: true, compression: true, }, - time_spacing: MS * 2000, + time_spacing: MS * 1000, }; ensemble.channels.push(chn); } - for i1 in 0..13 { + for i1 in 0..3 { let node = Node { id: i1, host: "localhost".into(), diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index aeab352..8c00536 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -1,4 +1,5 @@ use crate::agg::{IntoBinnedXBins1, IntoDim1F32Stream, MinMaxAvgScalarEventBatch}; +use crate::channelconfig::{extract_matching_config_entry, read_local_config}; use crate::eventblobs::EventBlobsComplete; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::{make_frame, make_term_frame}; @@ -123,6 +124,10 @@ async fn raw_conn_handler_inner_try( return Err((Error::with_msg("can not parse request json"), netout))?; } }; + match dbconn::channel_exists(&evq.channel, node_config.clone()).await { + Ok(_) => (), + Err(e) => return Err((e, netout))?, + } debug!( "\n\nREQUEST FOR RANGE {} {}\n\n", evq.range.beg / SEC, @@ -132,19 +137,48 @@ async fn raw_conn_handler_inner_try( "TODO decide on response content based on the parsed json query\n{:?}", evq ); + let range = &evq.range; + let channel_config = match read_local_config(&evq.channel, node_config.clone()).await { + Ok(k) => k, + Err(e) => return Err((e, netout))?, + }; + let entry = extract_matching_config_entry(range, &channel_config)?; + info!("found config entry {:?}", entry); + + let shape = match &entry.shape { + Some(lens) => { + if lens.len() == 1 { + Shape::Wave(lens[0]) + } else { + return Err(Error::with_msg(format!("Channel config unsupported shape {:?}", entry)))?; + } + } + None => Shape::Scalar, + }; + + /* + TODO + This endpoint should deliver events over some time range, across files. + Therefore, based on the query and the found channel config, list the available files in the + candidate directories, and iterate over events from those files. + !!! use index if available + • Generate index file for my test data. + • Use index file if available. + • If not, must use binary search if possible in that type. + Create a new type in place of AggQuerySingleChannel? + */ + err::todoval(); + let query = netpod::AggQuerySingleChannel { channel_config: netpod::ChannelConfig { - channel: netpod::Channel { - backend: "test1".into(), - name: "wave1".into(), - }, - keyspace: 3, - time_bin_size: DAY, - shape: Shape::Wave(17), - scalar_type: ScalarType::F64, - big_endian: true, - array: true, - compression: true, + channel: evq.channel.clone(), + keyspace: entry.ks as u8, + time_bin_size: entry.bs as u64, + shape: shape, + scalar_type: ScalarType::from_dtype_index(entry.dtype.to_i16() as u8), + big_endian: entry.is_big_endian, + array: entry.is_array, + compression: entry.is_compressed, }, // TODO use a NanoRange and search for matching files timebin: 0, diff --git a/err/Cargo.toml b/err/Cargo.toml index af5bcbb..7590356 100644 --- a/err/Cargo.toml +++ b/err/Cargo.toml @@ -15,3 +15,4 @@ bincode = "1.3.3" async-channel = "1.6" chrono = { version = "0.4.19", features = ["serde"] } nom = "6.1.2" +tokio-postgres = { version = "0.7", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } diff --git a/err/src/lib.rs b/err/src/lib.rs index feca841..727ea0a 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -186,6 +186,12 @@ impl From> for Error { } } +impl From for Error { + fn from(k: tokio_postgres::Error) -> Self { + Self::with_msg(k.to_string()) + } +} + pub fn todoval() -> T { todo!("TODO todoval") } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 0e3ad1e..d6898f6 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -112,9 +112,18 @@ impl Node { } } +#[derive(Debug)] +pub struct Database { + pub name: String, + pub host: String, + pub user: String, + pub pass: String, +} + #[derive(Debug)] pub struct Cluster { pub nodes: Vec>, + pub database: Database, } #[derive(Debug)] diff --git a/retrieval/src/bin/retrieval.rs b/retrieval/src/bin/retrieval.rs index b93f0d2..6327ec6 100644 --- a/retrieval/src/bin/retrieval.rs +++ b/retrieval/src/bin/retrieval.rs @@ -1,5 +1,5 @@ use err::Error; -use netpod::{timeunits::*, Channel, ChannelConfig, Cluster, Node, NodeConfig, ScalarType, Shape}; +use netpod::{timeunits::*, Channel, ChannelConfig, Cluster, Database, Node, NodeConfig, ScalarType, Shape}; use std::sync::Arc; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -62,7 +62,15 @@ fn simple_fetch() { tb_file_count: 1, buffer_size: 1024 * 8, }; - let cluster = Cluster { nodes: vec![node] }; + let cluster = Cluster { + nodes: vec![node], + database: Database { + name: "daqbuffer".into(), + host: "localhost".into(), + user: "daqbuffer".into(), + pass: "daqbuffer".into(), + }, + }; let cluster = Arc::new(cluster); let node_config = NodeConfig { node: cluster.nodes[0].clone(), diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index d341255..4dc6ffb 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -5,13 +5,13 @@ use disk::frame::inmem::InMemoryFrameAsyncReadStream; use err::Error; use futures_util::TryStreamExt; use hyper::Body; -use netpod::{Cluster, Node}; +use netpod::{Cluster, Database, Node}; use std::sync::Arc; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; fn test_cluster() -> Cluster { - let nodes = (0..13) + let nodes = (0..3) .into_iter() .map(|id| { let node = Node { @@ -27,7 +27,15 @@ fn test_cluster() -> Cluster { Arc::new(node) }) .collect(); - Cluster { nodes: nodes } + Cluster { + nodes: nodes, + database: Database { + name: "daqbuffer".into(), + host: "localhost".into(), + user: "daqbuffer".into(), + pass: "daqbuffer".into(), + }, + } } #[test] @@ -44,13 +52,15 @@ async fn get_cached_0_inner() -> Result<(), Error> { let end_date: chrono::DateTime = "1970-01-01T00:00:51.000Z".parse()?; let channel_backend = "back"; let channel_name = "wave1"; + let bin_count = 4; let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; let uri = format!( - "http://{}:{}/api/1/binned?channel_backend={}&channel_name={}&bin_count=4&beg_date={}&end_date={}", + "http://{}:{}/api/1/binned?channel_backend={}&channel_name={}&bin_count={}&beg_date={}&end_date={}", node0.host, node0.port, channel_backend, channel_name, + bin_count, beg_date.format(date_fmt), end_date.format(date_fmt), );