Refactor and add test for api1 binary query

This commit is contained in:
Dominik Werder
2022-11-24 19:55:35 +01:00
parent 8eedf53f39
commit 94e49bd014
27 changed files with 753 additions and 254 deletions

View File

@@ -17,6 +17,17 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "ahash"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47"
dependencies = [
"getrandom",
"once_cell",
"version_check",
]
[[package]]
name = "aho-corasick"
version = "0.7.20"
@@ -62,12 +73,6 @@ version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544"
[[package]]
name = "arrayvec"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]]
name = "async-channel"
version = "1.7.1"
@@ -205,6 +210,15 @@ dependencies = [
"num-traits",
]
[[package]]
name = "bincode"
version = "1.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad"
dependencies = [
"serde",
]
[[package]]
name = "bitflags"
version = "1.3.2"
@@ -219,18 +233,6 @@ dependencies = [
"libc",
]
[[package]]
name = "bitvec"
version = "0.19.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55f93d0ef3363c364d5976646a38f04cf67cfe1d4c8d160cdea02cab2c116b33"
dependencies = [
"funty",
"radium",
"tap",
"wyz",
]
[[package]]
name = "block-buffer"
version = "0.9.0"
@@ -249,6 +251,25 @@ dependencies = [
"generic-array",
]
[[package]]
name = "bson"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99d76085681585d39016f4d3841eb019201fc54d2dd0d92ad1e4fab3bfb32754"
dependencies = [
"ahash",
"base64",
"hex",
"indexmap",
"lazy_static",
"rand",
"serde",
"serde_bytes",
"serde_json",
"time 0.3.17",
"uuid",
]
[[package]]
name = "bumpalo"
version = "3.11.1"
@@ -638,14 +659,15 @@ dependencies = [
"chrono",
"disk",
"err",
"futures-core",
"futures-util",
"http",
"httpclient",
"httpret",
"hyper",
"items",
"lazy_static",
"netpod",
"nom",
"rmp-serde",
"serde",
"serde_derive",
@@ -654,7 +676,7 @@ dependencies = [
"taskrun",
"tokio",
"tracing",
"tracing-subscriber 0.2.25",
"tracing-subscriber 0.3.16",
"url",
]
@@ -743,7 +765,6 @@ dependencies = [
"items",
"libc",
"netpod",
"nom 6.1.2",
"num-derive",
"num-traits",
"parse",
@@ -911,12 +932,6 @@ dependencies = [
"url",
]
[[package]]
name = "funty"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7"
[[package]]
name = "futures"
version = "0.1.31"
@@ -1083,7 +1098,7 @@ dependencies = [
"base64",
"byteorder",
"flate2",
"nom 7.1.1",
"nom",
"num-traits",
]
@@ -1168,18 +1183,18 @@ checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904"
[[package]]
name = "httpclient"
version = "0.0.1-a.0"
version = "0.0.2"
dependencies = [
"async-channel",
"bytes",
"err",
"futures-core",
"futures-util",
"http",
"hyper",
"hyper-tls",
"netpod",
"parse",
"rmp-serde",
"serde",
"serde_json",
"tokio",
@@ -1342,6 +1357,8 @@ dependencies = [
name = "items"
version = "0.0.1-a.dev.4"
dependencies = [
"bincode",
"bson",
"bytes",
"chrono",
"ciborium",
@@ -1417,19 +1434,6 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "lexical-core"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6607c62aa161d23d17a9072cc5da0be67cdfc89d3afb1e8d9c842bebc2525ffe"
dependencies = [
"arrayvec",
"bitflags",
"cfg-if",
"ryu",
"static_assertions",
]
[[package]]
name = "libc"
version = "0.2.137"
@@ -1585,15 +1589,13 @@ dependencies = [
[[package]]
name = "netpod"
version = "0.0.1-a.0"
version = "0.0.2"
dependencies = [
"async-channel",
"bytes",
"chrono",
"err",
"futures-core",
"futures-util",
"lazy_static",
"num-traits",
"serde",
"serde_json",
@@ -1632,19 +1634,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "nom"
version = "6.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7413f999671bd4745a7b624bd370a569fb6bc574b23c83a3c5ed2e453f3d5e2"
dependencies = [
"bitvec",
"funty",
"lexical-core",
"memchr",
"version_check",
]
[[package]]
name = "nom"
version = "7.1.1"
@@ -1873,10 +1862,11 @@ dependencies = [
"err",
"hex",
"netpod",
"nom 6.1.2",
"nom",
"num-derive",
"num-traits",
"serde",
"serde_json",
"tokio",
]
@@ -2072,12 +2062,6 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "radium"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "941ba9d78d8e2f7ce474c015eea4d9c6d25b6a3327f9832ee29a4de27f91bbb8"
[[package]]
name = "rand"
version = "0.8.5"
@@ -2330,6 +2314,15 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "serde_bytes"
version = "0.11.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfc50e8183eeeb6178dcb167ae34a8051d63535023ae38b5d8d12beae193d37b"
dependencies = [
"serde",
]
[[package]]
name = "serde_cbor"
version = "0.11.2"
@@ -2357,6 +2350,7 @@ version = "1.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "020ff22c755c2ed3f8cf162dbb41a7268d934702f3ed3631656ea597e08fc3db"
dependencies = [
"indexmap",
"itoa",
"ryu",
"serde",
@@ -2508,12 +2502,6 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8"
[[package]]
name = "tap"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]]
name = "taskrun"
version = "0.0.1-a.0"
@@ -3015,6 +3003,10 @@ name = "uuid"
version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "422ee0de9031b5b948b97a8fc04e3aa35230001a722ddd27943e0be31564ce4c"
dependencies = [
"getrandom",
"serde",
]
[[package]]
name = "valuable"
@@ -3240,9 +3232,3 @@ name = "windows_x86_64_msvc"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5"
[[package]]
name = "wyz"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214"

View File

@@ -125,7 +125,7 @@ fn simple_fetch() {
array: true,
scalar_type: ScalarType::F64,
shape: Shape::Wave(42),
byte_order: ByteOrder::big_endian(),
byte_order: ByteOrder::Big,
compression: true,
},
timebin: 18720,

View File

@@ -4,6 +4,9 @@ version = "0.0.1-a.dev.12"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[lib]
path = "src/daqbufp2.rs"
[dependencies]
tokio = { version = "1.22.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
hyper = "0.14"
@@ -27,3 +30,6 @@ httpclient = { path = "../httpclient" }
disk = { path = "../disk" }
items = { path = "../items" }
streams = { path = "../streams" }
[dev-dependencies]
nom = "7.1.1"

View File

@@ -21,6 +21,9 @@ pub fn spawn_test_hosts(cluster: Cluster) -> Vec<JoinHandle<Result<(), Error>>>
let h = tokio::spawn(httpret::host(node_config).map_err(Error::from));
ret.push(h);
}
// TODO spawn also two proxy nodes
ret
}

View File

@@ -1,13 +1,15 @@
use crate::err::ErrConv;
#[cfg(test)]
mod api1_parse;
use crate::nodes::require_test_hosts_running;
use chrono::{DateTime, Utc};
use crate::test::api1::api1_parse::Api1Frame;
use err::Error;
use futures_util::Future;
use http::{header, Request, StatusCode};
use httpclient::{http_get, http_post};
use hyper::Body;
use httpclient::http_post;
use httpret::api1::Api1ScalarType;
use netpod::log::*;
use netpod::query::api1::{Api1Query, Api1Range, ChannelTuple};
use std::fmt;
use url::Url;
fn testrun<T, F>(fut: F) -> Result<T, Error>
@@ -17,6 +19,29 @@ where
taskrun::run(fut)
}
fn is_monitonic_strict<I>(it: I) -> bool
where
I: Iterator,
<I as Iterator>::Item: PartialOrd + fmt::Debug,
{
let mut last = None;
for x in it {
if let Some(last) = last.as_ref() {
if x <= *last {
return false;
}
}
last = Some(x);
}
true
}
#[test]
fn test_is_monitonic_strict() {
assert_eq!(is_monitonic_strict([1, 2, 3].iter()), true);
assert_eq!(is_monitonic_strict([1, 2, 2].iter()), false);
}
#[test]
fn events_f64_plain() -> Result<(), Error> {
let fut = async {
@@ -32,8 +57,55 @@ fn events_f64_plain() -> Result<(), Error> {
let body = serde_json::to_string(&qu)?;
let buf = http_post(url, accept, body.into()).await?;
eprintln!("body received: {}", buf.len());
//let js = String::from_utf8_lossy(&buf);
//eprintln!("string received: {js}");
match api1_parse::api1_frames(&buf) {
Ok((_, frames)) => {
debug!("FRAMES LEN: {}", frames.len());
assert_eq!(frames.len(), 121);
if let Api1Frame::Header(header) = &frames[0] {
if let Api1ScalarType::I32 = header.header().ty() {
} else {
panic!("bad scalar type")
}
} else {
panic!("bad header")
}
let tss: Vec<_> = frames
.iter()
.filter_map(|f| match f {
api1_parse::Api1Frame::Data(d) => Some(d.ts()),
_ => None,
})
.collect();
let pulses: Vec<_> = frames
.iter()
.filter_map(|f| match f {
api1_parse::Api1Frame::Data(d) => Some(d.pulse()),
_ => None,
})
.collect();
let values: Vec<_> = frames
.iter()
.filter_map(|f| match f {
api1_parse::Api1Frame::Data(d) => {
let val = i32::from_be_bytes(d.data().try_into().unwrap());
Some(val)
}
_ => None,
})
.collect();
assert_eq!(is_monitonic_strict(tss.iter()), true);
assert_eq!(is_monitonic_strict(pulses.iter()), true);
assert_eq!(is_monitonic_strict(values.iter()), true);
for &val in &values {
assert!(val >= 0);
assert!(val < 120);
}
}
Err(e) => {
error!("can not parse result: {e}");
panic!()
}
};
Ok(())
};
testrun(fut)?;

View File

@@ -0,0 +1,184 @@
use httpret::api1::Api1ChannelHeader;
use netpod::log::*;
use nom::multi::many0;
use nom::number::complete::{be_u32, be_u64, be_u8};
use nom::IResult;
use std::num::NonZeroUsize;
// u32be length_1.
// there is exactly length_1 more bytes in this message.
// u8 mtype: 0: channel-header, 1: data
// for mtype == 0:
// The rest is a JSON with the channel header.
// for mtype == 1:
// u64be timestamp
// u64be pulse
// After that comes exactly (length_1 - 17) bytes of data.
#[derive(Debug)]
pub struct Header {
header: Api1ChannelHeader,
}
impl Header {
pub fn header(&self) -> &Api1ChannelHeader {
&self.header
}
}
#[derive(Debug)]
pub struct Data {
ts: u64,
pulse: u64,
data: Vec<u8>,
}
impl Data {
pub fn ts(&self) -> u64 {
self.ts
}
pub fn pulse(&self) -> u64 {
self.pulse
}
pub fn data(&self) -> &[u8] {
&self.data
}
}
#[derive(Debug)]
pub enum Api1Frame {
Header(Header),
Data(Data),
}
fn header(inp: &[u8]) -> IResult<&[u8], Header> {
match serde_json::from_slice(inp) {
Ok(k) => {
let k: Api1ChannelHeader = k;
IResult::Ok((&inp[inp.len()..], Header { header: k }))
}
Err(e) => {
error!("json header parse error: {e}");
let e = nom::Err::Failure(nom::error::make_error(inp, nom::error::ErrorKind::Fail));
IResult::Err(e)
}
}
}
fn data(inp: &[u8]) -> IResult<&[u8], Data> {
if inp.len() < 16 {
use nom::{Err, Needed};
return IResult::Err(Err::Incomplete(Needed::Size(NonZeroUsize::new(16).unwrap())));
}
let (inp, ts) = be_u64(inp)?;
let (inp, pulse) = be_u64(inp)?;
let data = inp.into();
let inp = &inp[inp.len()..];
let res = Data { ts, pulse, data };
IResult::Ok((inp, res))
}
fn api1_frame_complete(inp: &[u8]) -> IResult<&[u8], Api1Frame> {
let (inp, mtype) = be_u8(inp)?;
if mtype == 0 {
let (inp, val) = header(inp)?;
if inp.len() != 0 {
// We did not consume the exact number of bytes
let kind = nom::error::ErrorKind::Verify;
let e = nom::error::Error::new(inp, kind);
Err(nom::Err::Failure(e))
} else {
let res = Api1Frame::Header(val);
IResult::Ok((inp, res))
}
} else if mtype == 1 {
let (inp, val) = data(inp)?;
if inp.len() != 0 {
// We did not consume the exact number of bytes
let kind = nom::error::ErrorKind::Verify;
let e = nom::error::Error::new(inp, kind);
Err(nom::Err::Failure(e))
} else {
let res = Api1Frame::Data(val);
IResult::Ok((inp, res))
}
} else {
let e = nom::Err::Incomplete(nom::Needed::Size(NonZeroUsize::new(1).unwrap()));
IResult::Err(e)
}
}
fn api1_frame(inp: &[u8]) -> IResult<&[u8], Api1Frame> {
let (inp, len) = be_u32(inp)?;
if len < 1 {
use nom::error::{ErrorKind, ParseError};
use nom::Err;
return IResult::Err(Err::Failure(ParseError::from_error_kind(inp, ErrorKind::Fail)));
}
if inp.len() < len as usize {
let e = nom::Err::Incomplete(nom::Needed::Size(NonZeroUsize::new(len as _).unwrap()));
IResult::Err(e)
} else {
let (inp2, inp) = inp.split_at(len as _);
let (inp2, res) = api1_frame_complete(inp2)?;
if inp2.len() != 0 {
let kind = nom::error::ErrorKind::Fail;
let e = nom::error::Error::new(inp, kind);
IResult::Err(nom::Err::Failure(e))
} else {
IResult::Ok((inp, res))
}
}
}
type Nres<'a, T> = IResult<&'a [u8], T, nom::error::VerboseError<&'a [u8]>>;
#[allow(unused)]
fn verbose_err(inp: &[u8]) -> Nres<u32> {
use nom::error::{ErrorKind, ParseError, VerboseError};
use nom::Err;
let e = VerboseError::from_error_kind(inp, ErrorKind::Fail);
return IResult::Err(Err::Failure(e));
}
pub fn api1_frames(inp: &[u8]) -> IResult<&[u8], Vec<Api1Frame>> {
let (inp, res) = many0(api1_frame)(inp)?;
IResult::Ok((inp, res))
}
#[test]
fn test_basic_frames() -> Result<(), err::Error> {
use std::io::Write;
let mut buf = Vec::new();
let js = r#"{"name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN"}"#;
buf.write(&(1 + js.as_bytes().len() as u32).to_be_bytes())?;
buf.write(&[0])?;
buf.write(js.as_bytes())?;
buf.write(&25u32.to_be_bytes())?;
buf.write(&[1])?;
buf.write(&20u64.to_be_bytes())?;
buf.write(&21u64.to_be_bytes())?;
buf.write(&5.123f64.to_be_bytes())?;
buf.write(&25u32.to_be_bytes())?;
buf.write(&[1])?;
buf.write(&22u64.to_be_bytes())?;
buf.write(&23u64.to_be_bytes())?;
buf.write(&7.88f64.to_be_bytes())?;
match api1_frames(&buf) {
Ok((_, frames)) => {
assert_eq!(frames.len(), 3);
}
Err(e) => {
error!("can not parse result: {e}");
panic!()
}
};
Ok(())
}

View File

@@ -30,7 +30,6 @@ tracing-futures = { version = "0.2.5", features = ["futures-01", "futures-03", "
fs2 = "0.4.3"
libc = "0.2.93"
hex = "0.4.3"
nom = "6.1.2"
num-traits = "0.2.14"
num-derive = "0.3"
url = "2.2.2"

View File

@@ -47,7 +47,7 @@ async fn agg_x_dim_0_inner() {
array: false,
shape: Shape::Scalar,
scalar_type: ScalarType::F64,
byte_order: ByteOrder::big_endian(),
byte_order: ByteOrder::Big,
compression: true,
},
timebin: 18723,
@@ -102,7 +102,7 @@ async fn agg_x_dim_1_inner() {
array: true,
shape: Shape::Wave(1024),
scalar_type: ScalarType::F64,
byte_order: ByteOrder::big_endian(),
byte_order: ByteOrder::Big,
compression: true,
},
timebin: 0,

View File

@@ -185,11 +185,11 @@ where
macro_rules! match_end {
($nty:ident, $end:expr, $scalar_type:expr, $shape:expr, $agg_kind:expr, $query:expr, $node_config:expr) => {
match $end {
ByteOrder::LE => {
ByteOrder::Little => {
make_num_pipeline_nty_end::<$nty, LittleEndian>($scalar_type, $shape, $agg_kind, $query, $node_config)
.await
}
ByteOrder::BE => {
ByteOrder::Big => {
make_num_pipeline_nty_end::<$nty, BigEndian>($scalar_type, $shape, $agg_kind, $query, $node_config)
.await
}
@@ -244,7 +244,7 @@ pub async fn pre_binned_bytes_for_http(
let ret = make_num_pipeline(
query.scalar_type().clone(),
// TODO actually, make_num_pipeline should not depend on endianness.
ByteOrder::LE,
ByteOrder::Little,
query.shape().clone(),
query.agg_kind().clone(),
query.clone(),

View File

@@ -152,8 +152,10 @@ where
macro_rules! match_end {
($f:expr, $nty:ident, $end:expr, $scalar_type:expr, $shape:expr, $agg_kind:expr, $node_config:expr) => {
match $end {
ByteOrder::LE => channel_exec_nty_end::<_, $nty, _>($f, LittleEndian {}, $scalar_type, $shape, $agg_kind),
ByteOrder::BE => channel_exec_nty_end::<_, $nty, _>($f, BigEndian {}, $scalar_type, $shape, $agg_kind),
ByteOrder::Little => {
channel_exec_nty_end::<_, $nty, _>($f, LittleEndian {}, $scalar_type, $shape, $agg_kind)
}
ByteOrder::Big => channel_exec_nty_end::<_, $nty, _>($f, BigEndian {}, $scalar_type, $shape, $agg_kind),
}
};
}
@@ -201,7 +203,7 @@ where
f,
scalar_type,
// TODO TODO TODO is the byte order ever important here?
ByteOrder::LE,
ByteOrder::Little,
shape,
agg_kind,
node_config,

View File

@@ -823,7 +823,7 @@ mod test {
keyspace: 2,
time_bin_size: Nanos { ns: DAY },
scalar_type: netpod::ScalarType::I32,
byte_order: netpod::ByteOrder::big_endian(),
byte_order: netpod::ByteOrder::Big,
shape: netpod::Shape::Scalar,
array: false,
compression: false,

View File

@@ -264,7 +264,7 @@ mod test {
keyspace: 2,
time_bin_size: Nanos { ns: DAY },
scalar_type: netpod::ScalarType::I32,
byte_order: netpod::ByteOrder::big_endian(),
byte_order: netpod::ByteOrder::Big,
shape: netpod::Shape::Scalar,
array: false,
compression: false,

View File

@@ -29,7 +29,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
keyspace: 2,
time_bin_size: Nanos { ns: DAY },
scalar_type: ScalarType::I32,
byte_order: ByteOrder::big_endian(),
byte_order: ByteOrder::Big,
shape: Shape::Scalar,
array: false,
compression: false,
@@ -50,7 +50,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
array: true,
scalar_type: ScalarType::F64,
shape: Shape::Wave(21),
byte_order: ByteOrder::big_endian(),
byte_order: ByteOrder::Big,
compression: true,
},
gen_var: netpod::GenVar::Default,
@@ -67,7 +67,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
keyspace: 3,
time_bin_size: Nanos { ns: DAY },
scalar_type: ScalarType::U16,
byte_order: ByteOrder::little_endian(),
byte_order: ByteOrder::Little,
shape: Shape::Wave(77),
array: true,
compression: true,
@@ -86,7 +86,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
keyspace: 2,
time_bin_size: Nanos { ns: DAY },
scalar_type: ScalarType::I32,
byte_order: ByteOrder::little_endian(),
byte_order: ByteOrder::Little,
shape: Shape::Scalar,
array: false,
compression: false,
@@ -105,7 +105,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
keyspace: 2,
time_bin_size: Nanos { ns: DAY },
scalar_type: ScalarType::I32,
byte_order: ByteOrder::little_endian(),
byte_order: ByteOrder::Little,
shape: Shape::Scalar,
array: false,
compression: false,

View File

@@ -360,7 +360,7 @@ mod test {
keyspace: 2,
time_bin_size: Nanos { ns: DAY },
scalar_type: ScalarType::I32,
byte_order: ByteOrder::BE,
byte_order: ByteOrder::Big,
array: false,
compression: false,
shape: Shape::Scalar,

View File

@@ -112,8 +112,8 @@ macro_rules! pipe3 {
macro_rules! pipe2 {
($nty:ident, $end:expr, $shape:expr, $agg_kind:expr, $event_blobs:expr) => {
match $end {
ByteOrder::LE => pipe3!($nty, LittleEndian, $shape, $agg_kind, $event_blobs),
ByteOrder::BE => pipe3!($nty, BigEndian, $shape, $agg_kind, $event_blobs),
ByteOrder::Little => pipe3!($nty, LittleEndian, $shape, $agg_kind, $event_blobs),
ByteOrder::Big => pipe3!($nty, BigEndian, $shape, $agg_kind, $event_blobs),
}
};
}

View File

@@ -83,7 +83,7 @@ pub fn main() -> Result<(), Error> {
compression: false,
shape: Shape::Scalar,
array: false,
byte_order: ByteOrder::LE,
byte_order: ByteOrder::Little,
};
let range = NanoRange {
beg: u64::MIN,

View File

@@ -1,6 +1,6 @@
use crate::err::Error;
use crate::gather::{gather_get_json_generic, SubRes};
use crate::{response, BodyStream};
use crate::{response, BodyStream, ReqCtx};
use bytes::{BufMut, BytesMut};
use futures_core::Stream;
use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
@@ -9,10 +9,10 @@ use hyper::{Body, Client, Request, Response};
use items::eventfull::EventFull;
use items::{RangeCompletableItem, Sitemty, StreamItem};
use itertools::Itertools;
use netpod::log::*;
use netpod::query::api1::Api1Query;
use netpod::query::RawEventsQuery;
use netpod::timeunits::SEC;
use netpod::{log::*, ScalarType};
use netpod::{ByteSize, Channel, DiskIoTune, NanoRange, NodeConfigCached, PerfOpts, Shape};
use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig};
use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET};
@@ -21,6 +21,7 @@ use parse::channelconfig::read_local_config;
use parse::channelconfig::{Config, ConfigEntry, MatchingConfigEntry};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -468,17 +469,127 @@ async fn process_answer(res: Response<Body>) -> Result<JsonValue, Error> {
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum Api1ScalarType {
#[serde(rename = "uint8")]
U8,
#[serde(rename = "uint16")]
U16,
#[serde(rename = "uint32")]
U32,
#[serde(rename = "uint64")]
U64,
#[serde(rename = "int8")]
I8,
#[serde(rename = "int16")]
I16,
#[serde(rename = "int32")]
I32,
#[serde(rename = "int64")]
I64,
#[serde(rename = "float32")]
F32,
#[serde(rename = "float64")]
F64,
#[serde(rename = "bool")]
BOOL,
#[serde(rename = "string")]
STRING,
}
impl Api1ScalarType {
pub fn to_str(&self) -> &'static str {
use Api1ScalarType as A;
match self {
A::U8 => "uint8",
A::U16 => "uint16",
A::U32 => "uint32",
A::U64 => "uint64",
A::I8 => "int8",
A::I16 => "int16",
A::I32 => "int32",
A::I64 => "int64",
A::F32 => "float32",
A::F64 => "float64",
A::BOOL => "bool",
A::STRING => "string",
}
}
}
impl fmt::Display for Api1ScalarType {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{}", self.to_str())
}
}
impl From<&ScalarType> for Api1ScalarType {
fn from(k: &ScalarType) -> Self {
use Api1ScalarType as B;
use ScalarType as A;
match k {
A::U8 => B::U8,
A::U16 => B::U16,
A::U32 => B::U32,
A::U64 => B::U64,
A::I8 => B::I8,
A::I16 => B::I16,
A::I32 => B::I32,
A::I64 => B::I64,
A::F32 => B::F32,
A::F64 => B::F64,
A::BOOL => B::BOOL,
A::STRING => B::STRING,
}
}
}
impl From<ScalarType> for Api1ScalarType {
fn from(x: ScalarType) -> Self {
(&x).into()
}
}
#[test]
fn test_custom_variant_name() {
let val = Api1ScalarType::F32;
assert_eq!(format!("{val:?}"), "F32");
assert_eq!(format!("{val}"), "float32");
let s = serde_json::to_string(&val).unwrap();
assert_eq!(s, "\"float32\"");
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum Api1ByteOrder {
#[serde(rename = "LITTLE_ENDIAN")]
Little,
#[serde(rename = "BIG_ENDIAN")]
Big,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Api1ChannelHeader {
name: String,
#[serde(rename = "type")]
ty: String,
ty: Api1ScalarType,
#[serde(rename = "byteOrder")]
byte_order: String,
byte_order: Api1ByteOrder,
#[serde(default)]
shape: Vec<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
compression: Option<usize>,
}
impl Api1ChannelHeader {
pub fn name(&self) -> &str {
&self.name
}
pub fn ty(&self) -> Api1ScalarType {
self.ty.clone()
}
}
pub struct DataApiPython3DataStream {
range: NanoRange,
channels: Vec<Channel>,
@@ -560,11 +671,11 @@ impl DataApiPython3DataStream {
if !*header_out {
let head = Api1ChannelHeader {
name: channel.name.clone(),
ty: b.scalar_types[i1].to_api3proto().into(),
ty: (&b.scalar_types[i1]).into(),
byte_order: if b.be[i1] {
"BIG_ENDIAN".into()
Api1ByteOrder::Big
} else {
"LITTLE_ENDIAN".into()
Api1ByteOrder::Little
},
// The shape is inconsistent on the events.
// Seems like the config is to be trusted in this case.
@@ -576,39 +687,18 @@ impl DataApiPython3DataStream {
let l1 = 1 + h.as_bytes().len() as u32;
d.put_u32(l1);
d.put_u8(0);
debug!("header frame byte len {}", 4 + 1 + h.as_bytes().len());
d.extend_from_slice(h.as_bytes());
d.put_u32(l1);
*header_out = true;
}
{
match &b.shapes[i1] {
Shape::Image(_, _) => {
let l1 = 17 + b.blobs[i1].len() as u32;
d.put_u32(l1);
d.put_u8(1);
d.put_u64(b.tss[i1]);
d.put_u64(b.pulses[i1]);
d.put_slice(&b.blobs[i1]);
d.put_u32(l1);
}
Shape::Wave(_) => {
let l1 = 17 + b.blobs[i1].len() as u32;
d.put_u32(l1);
d.put_u8(1);
d.put_u64(b.tss[i1]);
d.put_u64(b.pulses[i1]);
d.put_slice(&b.blobs[i1]);
d.put_u32(l1);
}
_ => {
let l1 = 17 + b.blobs[i1].len() as u32;
d.put_u32(l1);
d.put_u8(1);
d.put_u64(b.tss[i1]);
d.put_u64(b.pulses[i1]);
d.put_slice(&b.blobs[i1]);
d.put_u32(l1);
}
match &b.shapes[i1] {
_ => {
let l1 = 17 + b.blobs[i1].len() as u32;
d.put_u32(l1);
d.put_u8(1);
d.put_u64(b.tss[i1]);
d.put_u64(b.pulses[i1]);
d.put_slice(&b.blobs[i1]);
}
}
*count_events += 1;
@@ -810,7 +900,12 @@ impl Api1EventsBinaryHandler {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(
&self,
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
if req.method() != Method::POST {
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?);
}

View File

@@ -13,20 +13,12 @@ use std::task::{Context, Poll};
use tracing::field::Empty;
use tracing::{span, Level};
fn proxy_mark() -> &'static str {
"7c5e408a"
}
pub fn response<T>(status: T) -> http::response::Builder
where
http::StatusCode: std::convert::TryFrom<T>,
<http::StatusCode as std::convert::TryFrom<T>>::Error: Into<http::Error>,
{
Response::builder()
.status(status)
.header("Access-Control-Allow-Origin", "*")
.header("Access-Control-Allow-Headers", "*")
.header("x-proxy-log-mark", proxy_mark())
Response::builder().status(status)
}
pub struct BodyStream<S> {

View File

@@ -1,5 +1,6 @@
use crate::bodystream::response;
use crate::err::Error;
use crate::ReqCtx;
use http::{Method, Request, Response, StatusCode};
use hyper::Body;
use netpod::query::ChannelStateEventsQuery;
@@ -17,7 +18,12 @@ impl ConnectionStatusEvents {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(
&self,
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
@@ -70,7 +76,12 @@ impl ChannelConnectionStatusEvents {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(
&self,
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req

View File

@@ -1,6 +1,6 @@
use crate::channelconfig::{chconf_from_events_binary, chconf_from_events_json};
use crate::err::Error;
use crate::{response, response_err, BodyStream, ToPublicResponse};
use crate::{response, response_err, BodyStream, ReqCtx, ToPublicResponse};
use futures_util::{Stream, StreamExt, TryStreamExt};
use http::{Method, Request, Response, StatusCode};
use hyper::Body;
@@ -164,17 +164,27 @@ impl EventsHandlerScylla {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(
&self,
req: Request<Body>,
ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
}
match self.fetch(req, node_config).await {
match self.fetch(req, ctx, node_config).await {
Ok(ret) => Ok(ret),
Err(e) => Ok(e.to_public_response()),
}
}
async fn fetch(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
async fn fetch(
&self,
req: Request<Body>,
ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
info!("EventsHandlerScylla req: {:?}", req);
let accept_def = APP_JSON;
let accept = req
@@ -182,14 +192,19 @@ impl EventsHandlerScylla {
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
Ok(self.gather(req, node_config).await?)
Ok(self.gather(req, ctx, node_config).await?)
} else {
let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("Unsupported Accept: {:?}", accept))?;
Ok(ret)
}
}
async fn gather(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
async fn gather(
&self,
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
let self_name = std::any::type_name::<Self>();
let (head, _body) = req.into_parts();
warn!("TODO PlainEventsQuery needs to take AggKind to do x-binning");
@@ -303,11 +318,16 @@ impl BinnedHandlerScylla {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(
&self,
req: Request<Body>,
ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
}
match self.fetch(req, node_config).await {
match self.fetch(req, ctx, node_config).await {
Ok(ret) => Ok(ret),
Err(e) => {
eprintln!("error: {e}");
@@ -316,7 +336,12 @@ impl BinnedHandlerScylla {
}
}
async fn fetch(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
async fn fetch(
&self,
req: Request<Body>,
ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
info!("BinnedHandlerScylla req: {:?}", req);
let accept_def = APP_JSON;
let accept = req
@@ -324,14 +349,19 @@ impl BinnedHandlerScylla {
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
Ok(self.gather(req, node_config).await?)
Ok(self.gather(req, ctx, node_config).await?)
} else {
let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("Unsupported Accept: {:?}", accept))?;
Ok(ret)
}
}
async fn gather(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
async fn gather(
&self,
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
let (head, _body) = req.into_parts();
warn!("TODO BinnedQuery needs to take AggKind to do x-binngin");
let s1 = format!("dummy:{}", head.uri);

View File

@@ -30,6 +30,7 @@ use net::SocketAddr;
use netpod::log::*;
use netpod::query::BinnedQuery;
use netpod::timeunits::SEC;
use netpod::ProxyConfig;
use netpod::{FromUrl, NodeConfigCached, NodeStatus, NodeStatusArchiverAppliance};
use netpod::{ACCEPT_ALL, APP_JSON, APP_JSON_LINES, APP_OCTET};
use nodenet::conn::events_service;
@@ -45,6 +46,8 @@ use task::{Context, Poll};
use tracing::Instrument;
use url::Url;
pub const PSI_DAQBUFFER_SERVICE_MARK: &'static str = "PSI-Daqbuffer-Service-Mark";
pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> {
static STATUS_BOARD_INIT: Once = Once::new();
STATUS_BOARD_INIT.call_once(|| {
@@ -132,6 +135,41 @@ where
impl<F> UnwindSafe for Cont<F> {}
pub struct ReqCtx {
pub marks: Vec<String>,
pub mark: String,
}
impl ReqCtx {
fn with_node<T>(req: &Request<T>, nc: &NodeConfigCached) -> Self {
let mut marks = Vec::new();
for (n, v) in req.headers().iter() {
if n == PSI_DAQBUFFER_SERVICE_MARK {
marks.push(String::from_utf8_lossy(v.as_bytes()).to_string());
}
}
Self {
marks,
mark: format!("{}:{}", nc.node_config.name, nc.node.port),
}
}
}
impl ReqCtx {
fn with_proxy<T>(req: &Request<T>, proxy: &ProxyConfig) -> Self {
let mut marks = Vec::new();
for (n, v) in req.headers().iter() {
if n == PSI_DAQBUFFER_SERVICE_MARK {
marks.push(String::from_utf8_lossy(v.as_bytes()).to_string());
}
}
Self {
marks,
mark: format!("{}:{}", proxy.name, proxy.port),
}
}
}
// TODO remove because I want error bodies to be json.
pub fn response_err<T>(status: StatusCode, msg: T) -> Result<Response<Body>, Error>
where
@@ -193,11 +231,28 @@ macro_rules! static_http_api1 {
}
async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
let ctx = ReqCtx::with_node(&req, node_config);
let mut res = http_service_inner(req, &ctx, node_config).await?;
let hm = res.headers_mut();
hm.append("Access-Control-Allow-Origin", "*".parse().unwrap());
hm.append("Access-Control-Allow-Headers", "*".parse().unwrap());
for m in &ctx.marks {
hm.append(PSI_DAQBUFFER_SERVICE_MARK, m.parse().unwrap());
}
hm.append(PSI_DAQBUFFER_SERVICE_MARK, ctx.mark.parse().unwrap());
Ok(res)
}
async fn http_service_inner(
req: Request<Body>,
ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
let uri = req.uri().clone();
let path = uri.path();
if path == "/api/4/node_status" {
if req.method() == Method::GET {
Ok(node_status(req, &node_config).await?)
Ok(node_status(req, ctx, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
@@ -245,34 +300,34 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
} else if let Some(h) = events::EventsHandler::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = events::EventsHandlerScylla::handler(&req) {
h.handle(req, &node_config).await
h.handle(req, ctx, &node_config).await
} else if let Some(h) = events::BinnedHandlerScylla::handler(&req) {
h.handle(req, &node_config).await
h.handle(req, ctx, &node_config).await
} else if let Some(h) = channel_status::ConnectionStatusEvents::handler(&req) {
h.handle(req, &node_config).await
h.handle(req, ctx, &node_config).await
} else if let Some(h) = channel_status::ChannelConnectionStatusEvents::handler(&req) {
h.handle(req, &node_config).await
h.handle(req, ctx, &node_config).await
} else if path == "/api/4/binned" {
if req.method() == Method::GET {
Ok(binned(req, node_config).await?)
Ok(binned(req, ctx, node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/prebinned" {
if req.method() == Method::GET {
Ok(prebinned(req, &node_config).await?)
Ok(prebinned(req, ctx, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/table_sizes" {
if req.method() == Method::GET {
Ok(table_sizes(req, &node_config).await?)
Ok(table_sizes(req, ctx, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/random/channel" {
if req.method() == Method::GET {
Ok(random_channel(req, &node_config).await?)
Ok(random_channel(req, ctx, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
@@ -284,25 +339,25 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
}
} else if path == "/api/4/clear_cache" {
if req.method() == Method::GET {
Ok(clear_cache_all(req, &node_config).await?)
Ok(clear_cache_all(req, ctx, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/update_db_with_channel_names" {
if req.method() == Method::GET {
Ok(update_db_with_channel_names(req, &node_config).await?)
Ok(update_db_with_channel_names(req, ctx, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/update_db_with_all_channel_configs" {
if req.method() == Method::GET {
Ok(update_db_with_all_channel_configs(req, &node_config).await?)
Ok(update_db_with_all_channel_configs(req, ctx, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/update_search_cache" {
if req.method() == Method::GET {
Ok(update_search_cache(req, &node_config).await?)
Ok(update_search_cache(req, ctx, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
@@ -311,7 +366,7 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
} else if let Some(h) = settings::SettingsThreadsMaxHandler::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = api1::Api1EventsBinaryHandler::handler(&req) {
h.handle(req, &node_config).await
h.handle(req, ctx, &node_config).await
} else if let Some(h) = evinfo::EventInfoScan::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = pulsemap::MapPulseScyllaHandler::handler(&req) {
@@ -387,8 +442,8 @@ impl StatusBoardAllHandler {
}
}
async fn binned(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
match binned_inner(req, node_config).await {
async fn binned(req: Request<Body>, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
match binned_inner(req, ctx, node_config).await {
Ok(ret) => Ok(ret),
Err(e) => {
error!("fn binned: {e:?}");
@@ -397,7 +452,11 @@ async fn binned(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Re
}
}
async fn binned_inner(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
async fn binned_inner(
req: Request<Body>,
ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
let (head, _body) = req.into_parts();
let url = Url::parse(&format!("dummy:{}", head.uri))?;
let query = BinnedQuery::from_url(&url).map_err(|e| {
@@ -416,8 +475,8 @@ async fn binned_inner(req: Request<Body>, node_config: &NodeConfigCached) -> Res
debug!("binned STARTING {:?}", query);
});
match head.headers.get(http::header::ACCEPT) {
Some(v) if v == APP_OCTET => binned_binary(query, chconf, node_config).await,
Some(v) if v == APP_JSON || v == ACCEPT_ALL => binned_json(query, chconf, node_config).await,
Some(v) if v == APP_OCTET => binned_binary(query, chconf, &ctx, node_config).await,
Some(v) if v == APP_JSON || v == ACCEPT_ALL => binned_json(query, chconf, &ctx, node_config).await,
_ => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?),
}
}
@@ -425,6 +484,7 @@ async fn binned_inner(req: Request<Body>, node_config: &NodeConfigCached) -> Res
async fn binned_binary(
query: BinnedQuery,
chconf: ChConf,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
let body_stream =
@@ -439,6 +499,7 @@ async fn binned_binary(
async fn binned_json(
query: BinnedQuery,
chconf: ChConf,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
let body_stream = disk::binned::binned_json(&query, chconf.scalar_type, chconf.shape, node_config).await?;
@@ -449,8 +510,8 @@ async fn binned_json(
Ok(res)
}
async fn prebinned(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
match prebinned_inner(req, node_config).await {
async fn prebinned(req: Request<Body>, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
match prebinned_inner(req, ctx, node_config).await {
Ok(ret) => Ok(ret),
Err(e) => {
error!("fn prebinned: {e:?}");
@@ -459,7 +520,11 @@ async fn prebinned(req: Request<Body>, node_config: &NodeConfigCached) -> Result
}
}
async fn prebinned_inner(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
async fn prebinned_inner(
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
let (head, _body) = req.into_parts();
let query = PreBinnedQuery::from_request(&head)?;
let desc = format!(
@@ -486,7 +551,11 @@ async fn prebinned_inner(req: Request<Body>, node_config: &NodeConfigCached) ->
Ok(ret)
}
async fn node_status(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
async fn node_status(
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
let (_head, _body) = req.into_parts();
let archiver_appliance_status = match node_config.node.archiver_appliance.as_ref() {
Some(k) => {
@@ -525,7 +594,11 @@ async fn node_status(req: Request<Body>, node_config: &NodeConfigCached) -> Resu
Ok(ret)
}
async fn table_sizes(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
async fn table_sizes(
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
let (_head, _body) = req.into_parts();
let sizes = dbconn::table_sizes(node_config).await?;
let mut ret = String::new();
@@ -537,14 +610,22 @@ async fn table_sizes(req: Request<Body>, node_config: &NodeConfigCached) -> Resu
Ok(ret)
}
pub async fn random_channel(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn random_channel(
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
let (_head, _body) = req.into_parts();
let ret = dbconn::random_channel(node_config).await?;
let ret = response(StatusCode::OK).body(Body::from(ret))?;
Ok(ret)
}
pub async fn clear_cache_all(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn clear_cache_all(
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
let (head, _body) = req.into_parts();
let dry = match head.uri.query() {
Some(q) => q.contains("dry"),
@@ -559,6 +640,7 @@ pub async fn clear_cache_all(req: Request<Body>, node_config: &NodeConfigCached)
pub async fn update_db_with_channel_names(
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
let (head, _body) = req.into_parts();
@@ -594,6 +676,7 @@ pub async fn update_db_with_channel_names(
pub async fn update_db_with_channel_names_3(
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
let (head, _body) = req.into_parts();
@@ -616,6 +699,7 @@ pub async fn update_db_with_channel_names_3(
pub async fn update_db_with_all_channel_configs(
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
let (head, _body) = req.into_parts();
@@ -636,7 +720,11 @@ pub async fn update_db_with_all_channel_configs(
Ok(ret)
}
pub async fn update_search_cache(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn update_search_cache(
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
let (head, _body) = req.into_parts();
let _dry = match head.uri.query() {
Some(q) => q.contains("dry"),

View File

@@ -4,7 +4,7 @@ use crate::api1::{channel_search_configs_v1, channel_search_list_v1, gather_json
use crate::err::Error;
use crate::gather::{gather_get_json_generic, SubRes};
use crate::pulsemap::MapPulseQuery;
use crate::{api_1_docs, api_4_docs, response, response_err, Cont};
use crate::{api_1_docs, api_4_docs, response, response_err, Cont, ReqCtx, PSI_DAQBUFFER_SERVICE_MARK};
use futures_core::Stream;
use futures_util::pin_mut;
use http::{Method, StatusCode};
@@ -14,10 +14,9 @@ use hyper_tls::HttpsConnector;
use itertools::Itertools;
use netpod::log::*;
use netpod::query::{BinnedQuery, PlainEventsQuery};
use netpod::{
AppendToUrl, ChannelConfigQuery, ChannelSearchQuery, ChannelSearchResult, ChannelSearchSingleResult, FromUrl,
HasBackend, HasTimeout, ProxyConfig, ACCEPT_ALL, APP_JSON,
};
use netpod::{AppendToUrl, ChannelConfigQuery, FromUrl, HasBackend, HasTimeout, ProxyConfig};
use netpod::{ChannelSearchQuery, ChannelSearchResult, ChannelSearchSingleResult};
use netpod::{ACCEPT_ALL, APP_JSON};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::future::Future;
@@ -70,6 +69,23 @@ async fn proxy_http_service(req: Request<Body>, proxy_config: ProxyConfig) -> Re
}
async fn proxy_http_service_try(req: Request<Body>, proxy_config: &ProxyConfig) -> Result<Response<Body>, Error> {
let ctx = ReqCtx::with_proxy(&req, proxy_config);
let mut res = proxy_http_service_inner(req, &ctx, proxy_config).await?;
let hm = res.headers_mut();
hm.insert("Access-Control-Allow-Origin", "*".parse().unwrap());
hm.insert("Access-Control-Allow-Headers", "*".parse().unwrap());
for m in &ctx.marks {
hm.append(PSI_DAQBUFFER_SERVICE_MARK, m.parse().unwrap());
}
hm.append(PSI_DAQBUFFER_SERVICE_MARK, ctx.mark.parse().unwrap());
Ok(res)
}
async fn proxy_http_service_inner(
req: Request<Body>,
ctx: &ReqCtx,
proxy_config: &ProxyConfig,
) -> Result<Response<Body>, Error> {
let uri = req.uri().clone();
let path = uri.path();
if path == "/api/1/channels" {
@@ -84,16 +100,15 @@ async fn proxy_http_service_try(req: Request<Body>, proxy_config: &ProxyConfig)
Ok(proxy_api1_single_backend_query(req, proxy_config).await?)
} else if path.starts_with("/api/1/map/pulse/") {
warn!("/api/1/map/pulse/ DEPRECATED");
Ok(proxy_api1_map_pulse(req, proxy_config).await?)
Ok(proxy_api1_map_pulse(req, ctx, proxy_config).await?)
} else if path.starts_with("/api/1/gather/") {
Ok(gather_json_2_v1(req, "/api/1/gather/", proxy_config).await?)
} else if path == "/api/4/version" {
if req.method() == Method::GET {
let ret = serde_json::json!({
//"data_api_version": "4.0.0-beta",
"data_api_version": {
"major": 4,
"minor": 0,
"minor": 1,
},
});
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?)
@@ -107,13 +122,13 @@ async fn proxy_http_service_try(req: Request<Body>, proxy_config: &ProxyConfig)
} else if path == "/api/4/search/channel" {
Ok(api4::channel_search(req, proxy_config).await?)
} else if path == "/api/4/events" {
Ok(proxy_single_backend_query::<PlainEventsQuery>(req, proxy_config).await?)
Ok(proxy_single_backend_query::<PlainEventsQuery>(req, ctx, proxy_config).await?)
} else if path.starts_with("/api/4/map/pulse/") {
Ok(proxy_single_backend_query::<MapPulseQuery>(req, proxy_config).await?)
Ok(proxy_single_backend_query::<MapPulseQuery>(req, ctx, proxy_config).await?)
} else if path == "/api/4/binned" {
Ok(proxy_single_backend_query::<BinnedQuery>(req, proxy_config).await?)
Ok(proxy_single_backend_query::<BinnedQuery>(req, ctx, proxy_config).await?)
} else if path == "/api/4/channel/config" {
Ok(proxy_single_backend_query::<ChannelConfigQuery>(req, proxy_config).await?)
Ok(proxy_single_backend_query::<ChannelConfigQuery>(req, ctx, proxy_config).await?)
} else if path.starts_with("/api/1/documentation/") {
if req.method() == Method::GET {
api_1_docs(path)
@@ -149,12 +164,18 @@ async fn proxy_http_service_try(req: Request<Body>, proxy_config: &ProxyConfig)
} else if path.starts_with(DISTRI_PRE) {
proxy_distribute_v2(req).await
} else {
Ok(response(StatusCode::NOT_FOUND).body(Body::from(format!(
"Sorry, proxy can not find: {:?} {:?} {:?}",
req.method(),
req.uri().path(),
req.uri().query(),
)))?)
use std::fmt::Write;
let mut body = String::new();
let out = &mut body;
write!(out, "METHOD {:?}<br>\n", req.method())?;
write!(out, "URI {:?}<br>\n", req.uri())?;
write!(out, "HOST {:?}<br>\n", req.uri().host())?;
write!(out, "PORT {:?}<br>\n", req.uri().port())?;
write!(out, "PATH {:?}<br>\n", req.uri().path())?;
for (hn, hv) in req.headers() {
write!(out, "HEADER {hn:?}: {hv:?}<br>\n")?;
}
Ok(response(StatusCode::NOT_FOUND).body(Body::from(body))?)
}
}
@@ -381,7 +402,11 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
}
}
pub async fn proxy_api1_map_pulse(req: Request<Body>, proxy_config: &ProxyConfig) -> Result<Response<Body>, Error> {
pub async fn proxy_api1_map_pulse(
req: Request<Body>,
_ctx: &ReqCtx,
proxy_config: &ProxyConfig,
) -> Result<Response<Body>, Error> {
let s2 = format!("http://dummy/{}", req.uri());
info!("s2: {:?}", s2);
let url = Url::parse(&s2)?;
@@ -505,6 +530,7 @@ pub async fn proxy_api1_single_backend_query(
pub async fn proxy_single_backend_query<QT>(
req: Request<Body>,
_ctx: &ReqCtx,
proxy_config: &ProxyConfig,
) -> Result<Response<Body>, Error>
where

View File

@@ -303,23 +303,6 @@ impl ScalarType {
}
}
pub fn to_api3proto(&self) -> &'static str {
match self {
ScalarType::U8 => "uint8",
ScalarType::U16 => "uint16",
ScalarType::U32 => "uint32",
ScalarType::U64 => "uint64",
ScalarType::I8 => "int8",
ScalarType::I16 => "int16",
ScalarType::I32 => "int32",
ScalarType::I64 => "int64",
ScalarType::F32 => "float32",
ScalarType::F64 => "float64",
ScalarType::BOOL => "bool",
ScalarType::STRING => "string",
}
}
pub fn to_scylla_i32(&self) -> i32 {
self.index() as i32
}
@@ -725,31 +708,23 @@ impl NanoRange {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ByteOrder {
LE,
BE,
Little,
Big,
}
impl ByteOrder {
pub fn little_endian() -> Self {
Self::LE
}
pub fn big_endian() -> Self {
Self::BE
}
pub fn from_dtype_flags(flags: u8) -> Self {
if flags & 0x20 == 0 {
Self::LE
Self::Little
} else {
Self::BE
Self::Big
}
}
pub fn from_bsread_str(s: &str) -> Result<ByteOrder, Error> {
match s {
"little" => Ok(ByteOrder::LE),
"big" => Ok(ByteOrder::BE),
"little" => Ok(ByteOrder::Little),
"big" => Ok(ByteOrder::Big),
_ => Err(Error::with_msg_no_trace(format!(
"ByteOrder::from_bsread_str can not understand {}",
s
@@ -758,7 +733,7 @@ impl ByteOrder {
}
pub fn is_le(&self) -> bool {
if let Self::LE = self {
if let Self::Little = self {
true
} else {
false
@@ -766,7 +741,7 @@ impl ByteOrder {
}
pub fn is_be(&self) -> bool {
if let Self::BE = self {
if let Self::Big = self {
true
} else {
false

View File

@@ -6,12 +6,13 @@ edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.18.1" }
serde_json = "1.0.89"
tokio = { version = "1.22.0", features = ["fs"] }
chrono = { version = "0.4.19", features = ["serde"] }
bytes = "1.0.1"
byteorder = "1.4.3"
hex = "0.4.3"
nom = "6.1.2"
nom = "7.1.1"
num-traits = "0.2"
num-derive = "0.3"
err = { path = "../err" }

View File

@@ -1,14 +1,10 @@
use err::Error;
use netpod::timeunits::MS;
use netpod::{
ByteOrder, Channel, ChannelConfigQuery, ChannelConfigResponse, NanoRange, Nanos, Node, ScalarType, Shape,
};
use netpod::{ByteOrder, Channel, NanoRange, Nanos, Node, ScalarType, Shape};
use netpod::{ChannelConfigQuery, ChannelConfigResponse};
use nom::bytes::complete::take;
use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8};
use nom::Needed;
//use nom::bytes::complete::{tag, take_while_m_n};
//use nom::combinator::map_res;
//use nom::sequence::tuple;
use num_derive::{FromPrimitive, ToPrimitive};
use num_traits::ToPrimitive;
use serde::{Deserialize, Serialize};

32
parse/src/jsonconf.rs Normal file
View File

@@ -0,0 +1,32 @@
#[test]
fn test_json_trailing() {
use serde::Deserialize;
use serde_json::Value as JsonValue;
use std::io::Cursor;
if serde_json::from_str::<JsonValue>(r#"{}."#).is_ok() {
panic!("Should fail because of trailing character");
}
let cur = Cursor::new(r#"{}..."#);
let mut de = serde_json::Deserializer::from_reader(cur);
if JsonValue::deserialize(&mut de).is_err() {
panic!("Should allow trailing characters")
}
let cur = Cursor::new(r#"nullA"#);
let mut de = serde_json::Deserializer::from_reader(cur);
if let Ok(val) = JsonValue::deserialize(&mut de) {
if val != serde_json::json!(null) {
panic!("Bad parse")
}
} else {
panic!("Should allow trailing characters")
}
let cur = Cursor::new(r#" {}AA"#);
let mut de = serde_json::Deserializer::from_reader(cur);
if let Ok(val) = JsonValue::deserialize(&mut de) {
if val != serde_json::json!({}) {
panic!("Bad parse")
}
} else {
panic!("Should allow trailing characters")
}
}

View File

@@ -1 +1,2 @@
pub mod channelconfig;
mod jsonconf;