Generate and read channel config for test case
This commit is contained in:
@@ -51,11 +51,41 @@ impl Query {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn binned_bytes_for_http(node_config: Arc<NodeConfig>, query: &Query) -> Result<BinnedBytesForHttpStream, Error> {
|
||||
pub async fn binned_bytes_for_http(
|
||||
node_config: Arc<NodeConfig>,
|
||||
query: &Query,
|
||||
) -> Result<BinnedBytesForHttpStream, Error> {
|
||||
let agg_kind = AggKind::DimXBins1;
|
||||
|
||||
// TODO
|
||||
// Translate the Query TimeRange + AggKind into an iterator over the pre-binned patches.
|
||||
let channel_config = super::channelconfig::read_local_config(&query.channel, node_config.clone()).await?;
|
||||
let entry;
|
||||
{
|
||||
let mut ixs = vec![];
|
||||
for i1 in 0..channel_config.entries.len() {
|
||||
let e1 = &channel_config.entries[i1];
|
||||
if i1 + 1 < channel_config.entries.len() {
|
||||
let e2 = &channel_config.entries[i1 + 1];
|
||||
if e1.ts < query.range.end && e2.ts >= query.range.beg {
|
||||
ixs.push(i1);
|
||||
} else {
|
||||
}
|
||||
} else {
|
||||
if e1.ts < query.range.end {
|
||||
ixs.push(i1);
|
||||
} else {
|
||||
}
|
||||
}
|
||||
}
|
||||
if ixs.len() == 0 {
|
||||
return Err(Error::with_msg(format!("no config entries found")));
|
||||
} else if ixs.len() > 1 {
|
||||
return Err(Error::with_msg(format!("too many config entries found: {}", ixs.len())));
|
||||
}
|
||||
entry = &channel_config.entries[ixs[0]];
|
||||
}
|
||||
|
||||
info!("found config entry {:?}", entry);
|
||||
|
||||
let grid = PreBinnedPatchRange::covering_range(query.range.clone(), query.count);
|
||||
match grid {
|
||||
Some(spec) => {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use err::Error;
|
||||
use netpod::{Channel, NodeConfig};
|
||||
use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8};
|
||||
use nom::Needed;
|
||||
#[allow(unused_imports)]
|
||||
@@ -10,6 +11,7 @@ use nom::{
|
||||
use num_derive::{FromPrimitive, ToPrimitive};
|
||||
use num_traits::ToPrimitive;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
#[allow(unused_imports)]
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
|
||||
@@ -60,7 +62,7 @@ impl CompressionMethod {
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ConfigEntry {
|
||||
pub ts: i64,
|
||||
pub ts: u64,
|
||||
pub pulse: i64,
|
||||
pub ks: i32,
|
||||
pub bs: i64,
|
||||
@@ -198,7 +200,7 @@ pub fn parse_entry(inp: &[u8]) -> NRes<Option<ConfigEntry>> {
|
||||
Ok((
|
||||
inp_e,
|
||||
Some(ConfigEntry {
|
||||
ts,
|
||||
ts: ts as u64,
|
||||
pulse,
|
||||
ks,
|
||||
bs,
|
||||
@@ -262,31 +264,49 @@ pub fn parse_config(inp: &[u8]) -> NRes<Config> {
|
||||
Ok((inp, ret))
|
||||
}
|
||||
|
||||
pub async fn read_local_config(channel: &Channel, node_config: Arc<NodeConfig>) -> Result<Config, Error> {
|
||||
let path = node_config
|
||||
.node
|
||||
.data_base_path
|
||||
.join("config")
|
||||
.join(&channel.name)
|
||||
.join("latest")
|
||||
.join("00000_Config");
|
||||
let buf = tokio::fs::read(&path).await?;
|
||||
info!("try to parse config {} bytes", buf.len());
|
||||
let config = parse_config(&buf)?;
|
||||
Ok(config.1)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn read_data() -> Vec<u8> {
|
||||
use std::io::Read;
|
||||
let path = "ks/config/S10CB01-RLOD100-PUP10:SIG-AMPLT/latest/00000_Config";
|
||||
let mut f1 = std::fs::File::open(path).unwrap();
|
||||
let mut buf = vec![];
|
||||
f1.read_to_end(&mut buf).unwrap();
|
||||
buf
|
||||
}
|
||||
mod test {
|
||||
use super::parse_config;
|
||||
|
||||
#[test]
|
||||
fn parse_dummy() {
|
||||
let config = parse_config(&[0, 0, 0, 0, 0, 11, 0x61, 0x62, 0x63, 0, 0, 0, 11, 0, 0, 0, 1]).unwrap();
|
||||
assert_eq!(0, config.1.format_version);
|
||||
assert_eq!("abc", config.1.channel_name);
|
||||
}
|
||||
fn read_data() -> Vec<u8> {
|
||||
use std::io::Read;
|
||||
let path = "ks/config/S10CB01-RLOD100-PUP10:SIG-AMPLT/latest/00000_Config";
|
||||
let mut f1 = std::fs::File::open(path).unwrap();
|
||||
let mut buf = vec![];
|
||||
f1.read_to_end(&mut buf).unwrap();
|
||||
buf
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn open_file() {
|
||||
let config = parse_config(&read_data()).unwrap().1;
|
||||
assert_eq!(0, config.format_version);
|
||||
assert_eq!(9, config.entries.len());
|
||||
for e in &config.entries {
|
||||
assert!(e.ts >= 631152000000000000);
|
||||
assert!(e.ts <= 1591106812800073974);
|
||||
assert!(e.shape.is_some());
|
||||
#[test]
|
||||
fn parse_dummy() {
|
||||
let config = parse_config(&[0, 0, 0, 0, 0, 11, 0x61, 0x62, 0x63, 0, 0, 0, 11, 0, 0, 0, 1]).unwrap();
|
||||
assert_eq!(0, config.1.format_version);
|
||||
assert_eq!("abc", config.1.channel_name);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn open_file() {
|
||||
let config = parse_config(&read_data()).unwrap().1;
|
||||
assert_eq!(0, config.format_version);
|
||||
assert_eq!(9, config.entries.len());
|
||||
for e in &config.entries {
|
||||
assert!(e.ts >= 631152000000000000);
|
||||
assert!(e.ts <= 1591106812800073974);
|
||||
assert!(e.shape.is_some());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,7 +42,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
|
||||
big_endian: true,
|
||||
compression: true,
|
||||
},
|
||||
time_spacing: MS * 10,
|
||||
time_spacing: MS * 2000,
|
||||
};
|
||||
ensemble.channels.push(chn);
|
||||
}
|
||||
@@ -150,8 +150,8 @@ async fn gen_config(
|
||||
|
||||
{
|
||||
// this len does not include itself and there seems to be no copy of it afterwards.
|
||||
buf.put_i32(0x20202020);
|
||||
let p3 = buf.len();
|
||||
buf.put_i32(404040);
|
||||
buf.put_u8(config.dtflags());
|
||||
buf.put_u8(config.scalar_type.index());
|
||||
if config.compression {
|
||||
@@ -161,13 +161,25 @@ async fn gen_config(
|
||||
match config.shape {
|
||||
Shape::Scalar => {}
|
||||
Shape::Wave(k) => {
|
||||
buf.put_i8(1);
|
||||
buf.put_i32(k as i32);
|
||||
}
|
||||
}
|
||||
let len = buf.len() - p3;
|
||||
let len = buf.len() - p3 - 4;
|
||||
buf.as_mut()[p3..].as_mut().put_i32(len as i32);
|
||||
}
|
||||
|
||||
// source name
|
||||
buf.put_i32(-1);
|
||||
// unit
|
||||
buf.put_i32(-1);
|
||||
// description
|
||||
buf.put_i32(-1);
|
||||
// optional fields
|
||||
buf.put_i32(-1);
|
||||
// value converter
|
||||
buf.put_i32(-1);
|
||||
|
||||
let p2 = buf.len();
|
||||
let len = p2 - p1 + 4;
|
||||
buf.put_i32(len as i32);
|
||||
|
||||
@@ -8,6 +8,7 @@ edition = "2018"
|
||||
hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp"] }
|
||||
http = "0.2"
|
||||
tokio = { version = "1.5.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
|
||||
backtrace = "0.3.56"
|
||||
serde_json = "1.0"
|
||||
async-channel = "1.6"
|
||||
chrono = { version = "0.4.19", features = ["serde"] }
|
||||
|
||||
@@ -4,32 +4,41 @@ use std::num::ParseIntError;
|
||||
use std::string::FromUtf8Error;
|
||||
use tokio::task::JoinError;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Error {
|
||||
msg: String,
|
||||
trace: backtrace::Backtrace,
|
||||
}
|
||||
|
||||
impl Error {
|
||||
pub fn with_msg<S: Into<String>>(s: S) -> Self {
|
||||
Self { msg: s.into() }
|
||||
Self {
|
||||
msg: s.into(),
|
||||
trace: backtrace::Backtrace::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<String> for Error {
|
||||
fn from(k: String) -> Self {
|
||||
Self { msg: k }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&str> for Error {
|
||||
fn from(k: &str) -> Self {
|
||||
Self { msg: k.into() }
|
||||
impl std::fmt::Debug for Error {
|
||||
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(fmt, "Error {} backtrace:\n{:?}", self.msg, self.trace)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Error {
|
||||
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(fmt, "Error")
|
||||
write!(fmt, "Error {} backtrace:\n{:?}", self.msg, self.trace)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<String> for Error {
|
||||
fn from(k: String) -> Self {
|
||||
Self::with_msg(k)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&str> for Error {
|
||||
fn from(k: &str) -> Self {
|
||||
Self::with_msg(k)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,49 +46,49 @@ impl std::error::Error for Error {}
|
||||
|
||||
impl From<std::io::Error> for Error {
|
||||
fn from(k: std::io::Error) -> Self {
|
||||
Self { msg: k.to_string() }
|
||||
Self::with_msg(k.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<http::Error> for Error {
|
||||
fn from(k: http::Error) -> Self {
|
||||
Self { msg: k.to_string() }
|
||||
Self::with_msg(k.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<hyper::Error> for Error {
|
||||
fn from(k: hyper::Error) -> Self {
|
||||
Self { msg: k.to_string() }
|
||||
Self::with_msg(k.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<serde_json::Error> for Error {
|
||||
fn from(k: serde_json::Error) -> Self {
|
||||
Self { msg: k.to_string() }
|
||||
Self::with_msg(k.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<async_channel::RecvError> for Error {
|
||||
fn from(k: async_channel::RecvError) -> Self {
|
||||
Self { msg: k.to_string() }
|
||||
Self::with_msg(k.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<chrono::format::ParseError> for Error {
|
||||
fn from(k: chrono::format::ParseError) -> Self {
|
||||
Self { msg: k.to_string() }
|
||||
Self::with_msg(k.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ParseIntError> for Error {
|
||||
fn from(k: ParseIntError) -> Self {
|
||||
Self { msg: k.to_string() }
|
||||
Self::with_msg(k.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<FromUtf8Error> for Error {
|
||||
fn from(k: FromUtf8Error) -> Self {
|
||||
Self { msg: k.to_string() }
|
||||
Self::with_msg(k.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -106,8 +115,5 @@ impl From<JoinError> for Error {
|
||||
}
|
||||
|
||||
pub fn todoval<T>() -> T {
|
||||
if true {
|
||||
todo!("TODO todoval");
|
||||
}
|
||||
todo!()
|
||||
todo!("TODO todoval")
|
||||
}
|
||||
|
||||
@@ -216,16 +216,8 @@ where
|
||||
async fn binned(req: Request<Body>, node_config: Arc<NodeConfig>) -> Result<Response<Body>, Error> {
|
||||
info!("-------------------------------------------------------- BINNED");
|
||||
let (head, _body) = req.into_parts();
|
||||
//let params = netpod::query_params(head.uri.query());
|
||||
|
||||
// TODO
|
||||
// Channel, time range, bin size.
|
||||
// Try to locate that file in cache, otherwise create it on the fly:
|
||||
// Look up and parse channel config.
|
||||
// Extract the relevant channel config entry.
|
||||
|
||||
let query = disk::cache::Query::from_request(&head)?;
|
||||
let ret = match disk::cache::binned_bytes_for_http(node_config, &query) {
|
||||
let ret = match disk::cache::binned_bytes_for_http(node_config, &query).await {
|
||||
Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s))?,
|
||||
Err(e) => {
|
||||
error!("{:?}", e);
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::spawn_test_hosts;
|
||||
use chrono::Utc;
|
||||
use err::Error;
|
||||
use hyper::Body;
|
||||
use netpod::{Cluster, Node};
|
||||
@@ -35,13 +36,24 @@ async fn get_cached_0_inner() -> Result<(), Error> {
|
||||
let cluster = Arc::new(test_cluster());
|
||||
let node0 = &cluster.nodes[0];
|
||||
let hosts = spawn_test_hosts(cluster.clone());
|
||||
let beg_date: chrono::DateTime<Utc> = "1970-01-01T00:00:10.000Z".parse()?;
|
||||
let end_date: chrono::DateTime<Utc> = "1970-01-01T00:00:51.000Z".parse()?;
|
||||
let channel = "wave1";
|
||||
let date_fmt = "%Y-%m-%dT%H:%M:%S%.3fZ";
|
||||
let uri = format!(
|
||||
"http://{}:{}/api/1/binned?channel_backend=testbackend&channel_name={}&bin_count=4&beg_date={}&end_date={}",
|
||||
node0.host,
|
||||
node0.port,
|
||||
channel,
|
||||
beg_date.format(date_fmt),
|
||||
end_date.format(date_fmt),
|
||||
);
|
||||
info!("URI {:?}", uri);
|
||||
let req = hyper::Request::builder()
|
||||
.method(http::Method::GET)
|
||||
.uri(format!(
|
||||
"http://{}:{}/api/1/binned?channel_backend=testbackend&channel_keyspace=3&channel_name=wave1&bin_count=4&beg_date=1970-01-01T00:00:10.000Z&end_date=1970-01-01T00:00:51.000Z",
|
||||
node0.host, node0.port
|
||||
))
|
||||
.body(Body::empty())?;
|
||||
.method(http::Method::GET)
|
||||
.uri(uri)
|
||||
.body(Body::empty())?;
|
||||
info!("Request for {:?}", req);
|
||||
let client = hyper::Client::new();
|
||||
let res = client.request(req).await?;
|
||||
info!("client response {:?}", res);
|
||||
|
||||
@@ -7,7 +7,7 @@ use tracing::{debug, error, info, trace, warn};
|
||||
|
||||
pub fn run<T, F: std::future::Future<Output = Result<T, Error>>>(f: F) -> Result<T, Error> {
|
||||
tracing_init();
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
let res = tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(12)
|
||||
.max_blocking_threads(256)
|
||||
.enable_all()
|
||||
@@ -25,7 +25,14 @@ pub fn run<T, F: std::future::Future<Output = Result<T, Error>>>(f: F) -> Result
|
||||
})
|
||||
.build()
|
||||
.unwrap()
|
||||
.block_on(async { f.await })
|
||||
.block_on(async { f.await });
|
||||
match res {
|
||||
Ok(k) => Ok(k),
|
||||
Err(e) => {
|
||||
error!("{:?}", e);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn tracing_init() {
|
||||
|
||||
Reference in New Issue
Block a user