diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index ba539c5..56dadee 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -17,6 +17,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" +dependencies = [ + "getrandom", + "once_cell", + "version_check", +] + [[package]] name = "aho-corasick" version = "0.7.20" @@ -62,12 +73,6 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544" -[[package]] -name = "arrayvec" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" - [[package]] name = "async-channel" version = "1.7.1" @@ -205,6 +210,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -219,18 +233,6 @@ dependencies = [ "libc", ] -[[package]] -name = "bitvec" -version = "0.19.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55f93d0ef3363c364d5976646a38f04cf67cfe1d4c8d160cdea02cab2c116b33" -dependencies = [ - "funty", - "radium", - "tap", - "wyz", -] - [[package]] name = "block-buffer" version = "0.9.0" @@ -249,6 +251,25 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bson" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d76085681585d39016f4d3841eb019201fc54d2dd0d92ad1e4fab3bfb32754" +dependencies = [ + "ahash", + "base64", + "hex", + "indexmap", + "lazy_static", + "rand", + "serde", + "serde_bytes", + "serde_json", + "time 0.3.17", + "uuid", +] + [[package]] name = "bumpalo" version = "3.11.1" @@ -638,14 +659,15 @@ dependencies = [ "chrono", "disk", "err", - "futures-core", "futures-util", "http", + "httpclient", "httpret", "hyper", "items", "lazy_static", "netpod", + "nom", "rmp-serde", "serde", "serde_derive", @@ -654,7 +676,7 @@ dependencies = [ "taskrun", "tokio", "tracing", - "tracing-subscriber 0.2.25", + "tracing-subscriber 0.3.16", "url", ] @@ -743,7 +765,6 @@ dependencies = [ "items", "libc", "netpod", - "nom 6.1.2", "num-derive", "num-traits", "parse", @@ -911,12 +932,6 @@ dependencies = [ "url", ] -[[package]] -name = "funty" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7" - [[package]] name = "futures" version = "0.1.31" @@ -1083,7 +1098,7 @@ dependencies = [ "base64", "byteorder", "flate2", - "nom 7.1.1", + "nom", "num-traits", ] @@ -1168,18 +1183,18 @@ checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" [[package]] name = "httpclient" -version = "0.0.1-a.0" +version = "0.0.2" dependencies = [ "async-channel", "bytes", "err", - "futures-core", "futures-util", "http", "hyper", "hyper-tls", "netpod", "parse", + "rmp-serde", "serde", "serde_json", "tokio", @@ -1342,6 +1357,8 @@ dependencies = [ name = "items" version = "0.0.1-a.dev.4" dependencies = [ + "bincode", + "bson", "bytes", "chrono", "ciborium", @@ -1417,19 +1434,6 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" -[[package]] -name = "lexical-core" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6607c62aa161d23d17a9072cc5da0be67cdfc89d3afb1e8d9c842bebc2525ffe" -dependencies = [ - "arrayvec", - "bitflags", - "cfg-if", - "ryu", - "static_assertions", -] - [[package]] name = "libc" version = "0.2.137" @@ -1585,15 +1589,13 @@ dependencies = [ [[package]] name = "netpod" -version = "0.0.1-a.0" +version = "0.0.2" dependencies = [ "async-channel", "bytes", "chrono", "err", - "futures-core", "futures-util", - "lazy_static", "num-traits", "serde", "serde_json", @@ -1632,19 +1634,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "nom" -version = "6.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7413f999671bd4745a7b624bd370a569fb6bc574b23c83a3c5ed2e453f3d5e2" -dependencies = [ - "bitvec", - "funty", - "lexical-core", - "memchr", - "version_check", -] - [[package]] name = "nom" version = "7.1.1" @@ -1873,10 +1862,11 @@ dependencies = [ "err", "hex", "netpod", - "nom 6.1.2", + "nom", "num-derive", "num-traits", "serde", + "serde_json", "tokio", ] @@ -2072,12 +2062,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "radium" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "941ba9d78d8e2f7ce474c015eea4d9c6d25b6a3327f9832ee29a4de27f91bbb8" - [[package]] name = "rand" version = "0.8.5" @@ -2330,6 +2314,15 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde_bytes" +version = "0.11.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfc50e8183eeeb6178dcb167ae34a8051d63535023ae38b5d8d12beae193d37b" +dependencies = [ + "serde", +] + [[package]] name = "serde_cbor" version = "0.11.2" @@ -2357,6 +2350,7 @@ version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "020ff22c755c2ed3f8cf162dbb41a7268d934702f3ed3631656ea597e08fc3db" dependencies = [ + "indexmap", "itoa", "ryu", "serde", @@ -2508,12 +2502,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" -[[package]] -name = "tap" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" - [[package]] name = "taskrun" version = "0.0.1-a.0" @@ -3015,6 +3003,10 @@ name = "uuid" version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "422ee0de9031b5b948b97a8fc04e3aa35230001a722ddd27943e0be31564ce4c" +dependencies = [ + "getrandom", + "serde", +] [[package]] name = "valuable" @@ -3240,9 +3232,3 @@ name = "windows_x86_64_msvc" version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" - -[[package]] -name = "wyz" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214" diff --git a/daqbuffer/src/bin/daqbuffer.rs b/daqbuffer/src/bin/daqbuffer.rs index 07bd7ce..392a95a 100644 --- a/daqbuffer/src/bin/daqbuffer.rs +++ b/daqbuffer/src/bin/daqbuffer.rs @@ -125,7 +125,7 @@ fn simple_fetch() { array: true, scalar_type: ScalarType::F64, shape: Shape::Wave(42), - byte_order: ByteOrder::big_endian(), + byte_order: ByteOrder::Big, compression: true, }, timebin: 18720, diff --git a/daqbufp2/Cargo.toml b/daqbufp2/Cargo.toml index a8a464a..450fb08 100644 --- a/daqbufp2/Cargo.toml +++ b/daqbufp2/Cargo.toml @@ -4,6 +4,9 @@ version = "0.0.1-a.dev.12" authors = ["Dominik Werder "] edition = "2021" +[lib] +path = "src/daqbufp2.rs" + [dependencies] tokio = { version = "1.22.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } hyper = "0.14" @@ -27,3 +30,6 @@ httpclient = { path = "../httpclient" } disk = { path = "../disk" } items = { path = "../items" } streams = { path = "../streams" } + +[dev-dependencies] +nom = "7.1.1" diff --git a/daqbufp2/src/lib.rs b/daqbufp2/src/daqbufp2.rs similarity index 96% rename from daqbufp2/src/lib.rs rename to daqbufp2/src/daqbufp2.rs index 6a445e9..cb5cdf9 100644 --- a/daqbufp2/src/lib.rs +++ b/daqbufp2/src/daqbufp2.rs @@ -21,6 +21,9 @@ pub fn spawn_test_hosts(cluster: Cluster) -> Vec>> let h = tokio::spawn(httpret::host(node_config).map_err(Error::from)); ret.push(h); } + + // TODO spawn also two proxy nodes + ret } diff --git a/daqbufp2/src/test/api1.rs b/daqbufp2/src/test/api1.rs index c7c1da6..6f96405 100644 --- a/daqbufp2/src/test/api1.rs +++ b/daqbufp2/src/test/api1.rs @@ -1,13 +1,15 @@ -use crate::err::ErrConv; +#[cfg(test)] +mod api1_parse; + use crate::nodes::require_test_hosts_running; -use chrono::{DateTime, Utc}; +use crate::test::api1::api1_parse::Api1Frame; use err::Error; use futures_util::Future; -use http::{header, Request, StatusCode}; -use httpclient::{http_get, http_post}; -use hyper::Body; +use httpclient::http_post; +use httpret::api1::Api1ScalarType; use netpod::log::*; use netpod::query::api1::{Api1Query, Api1Range, ChannelTuple}; +use std::fmt; use url::Url; fn testrun(fut: F) -> Result @@ -17,6 +19,29 @@ where taskrun::run(fut) } +fn is_monitonic_strict(it: I) -> bool +where + I: Iterator, + ::Item: PartialOrd + fmt::Debug, +{ + let mut last = None; + for x in it { + if let Some(last) = last.as_ref() { + if x <= *last { + return false; + } + } + last = Some(x); + } + true +} + +#[test] +fn test_is_monitonic_strict() { + assert_eq!(is_monitonic_strict([1, 2, 3].iter()), true); + assert_eq!(is_monitonic_strict([1, 2, 2].iter()), false); +} + #[test] fn events_f64_plain() -> Result<(), Error> { let fut = async { @@ -32,8 +57,55 @@ fn events_f64_plain() -> Result<(), Error> { let body = serde_json::to_string(&qu)?; let buf = http_post(url, accept, body.into()).await?; eprintln!("body received: {}", buf.len()); - //let js = String::from_utf8_lossy(&buf); - //eprintln!("string received: {js}"); + match api1_parse::api1_frames(&buf) { + Ok((_, frames)) => { + debug!("FRAMES LEN: {}", frames.len()); + assert_eq!(frames.len(), 121); + if let Api1Frame::Header(header) = &frames[0] { + if let Api1ScalarType::I32 = header.header().ty() { + } else { + panic!("bad scalar type") + } + } else { + panic!("bad header") + } + let tss: Vec<_> = frames + .iter() + .filter_map(|f| match f { + api1_parse::Api1Frame::Data(d) => Some(d.ts()), + _ => None, + }) + .collect(); + let pulses: Vec<_> = frames + .iter() + .filter_map(|f| match f { + api1_parse::Api1Frame::Data(d) => Some(d.pulse()), + _ => None, + }) + .collect(); + let values: Vec<_> = frames + .iter() + .filter_map(|f| match f { + api1_parse::Api1Frame::Data(d) => { + let val = i32::from_be_bytes(d.data().try_into().unwrap()); + Some(val) + } + _ => None, + }) + .collect(); + assert_eq!(is_monitonic_strict(tss.iter()), true); + assert_eq!(is_monitonic_strict(pulses.iter()), true); + assert_eq!(is_monitonic_strict(values.iter()), true); + for &val in &values { + assert!(val >= 0); + assert!(val < 120); + } + } + Err(e) => { + error!("can not parse result: {e}"); + panic!() + } + }; Ok(()) }; testrun(fut)?; diff --git a/daqbufp2/src/test/api1/api1_parse.rs b/daqbufp2/src/test/api1/api1_parse.rs new file mode 100644 index 0000000..c6e112f --- /dev/null +++ b/daqbufp2/src/test/api1/api1_parse.rs @@ -0,0 +1,184 @@ +use httpret::api1::Api1ChannelHeader; +use netpod::log::*; +use nom::multi::many0; +use nom::number::complete::{be_u32, be_u64, be_u8}; +use nom::IResult; +use std::num::NonZeroUsize; + +// u32be length_1. +// there is exactly length_1 more bytes in this message. +// u8 mtype: 0: channel-header, 1: data + +// for mtype == 0: +// The rest is a JSON with the channel header. + +// for mtype == 1: +// u64be timestamp +// u64be pulse +// After that comes exactly (length_1 - 17) bytes of data. + +#[derive(Debug)] +pub struct Header { + header: Api1ChannelHeader, +} + +impl Header { + pub fn header(&self) -> &Api1ChannelHeader { + &self.header + } +} + +#[derive(Debug)] +pub struct Data { + ts: u64, + pulse: u64, + data: Vec, +} + +impl Data { + pub fn ts(&self) -> u64 { + self.ts + } + + pub fn pulse(&self) -> u64 { + self.pulse + } + + pub fn data(&self) -> &[u8] { + &self.data + } +} + +#[derive(Debug)] +pub enum Api1Frame { + Header(Header), + Data(Data), +} + +fn header(inp: &[u8]) -> IResult<&[u8], Header> { + match serde_json::from_slice(inp) { + Ok(k) => { + let k: Api1ChannelHeader = k; + IResult::Ok((&inp[inp.len()..], Header { header: k })) + } + Err(e) => { + error!("json header parse error: {e}"); + let e = nom::Err::Failure(nom::error::make_error(inp, nom::error::ErrorKind::Fail)); + IResult::Err(e) + } + } +} + +fn data(inp: &[u8]) -> IResult<&[u8], Data> { + if inp.len() < 16 { + use nom::{Err, Needed}; + return IResult::Err(Err::Incomplete(Needed::Size(NonZeroUsize::new(16).unwrap()))); + } + let (inp, ts) = be_u64(inp)?; + let (inp, pulse) = be_u64(inp)?; + let data = inp.into(); + let inp = &inp[inp.len()..]; + let res = Data { ts, pulse, data }; + IResult::Ok((inp, res)) +} + +fn api1_frame_complete(inp: &[u8]) -> IResult<&[u8], Api1Frame> { + let (inp, mtype) = be_u8(inp)?; + if mtype == 0 { + let (inp, val) = header(inp)?; + if inp.len() != 0 { + // We did not consume the exact number of bytes + let kind = nom::error::ErrorKind::Verify; + let e = nom::error::Error::new(inp, kind); + Err(nom::Err::Failure(e)) + } else { + let res = Api1Frame::Header(val); + IResult::Ok((inp, res)) + } + } else if mtype == 1 { + let (inp, val) = data(inp)?; + if inp.len() != 0 { + // We did not consume the exact number of bytes + let kind = nom::error::ErrorKind::Verify; + let e = nom::error::Error::new(inp, kind); + Err(nom::Err::Failure(e)) + } else { + let res = Api1Frame::Data(val); + IResult::Ok((inp, res)) + } + } else { + let e = nom::Err::Incomplete(nom::Needed::Size(NonZeroUsize::new(1).unwrap())); + IResult::Err(e) + } +} + +fn api1_frame(inp: &[u8]) -> IResult<&[u8], Api1Frame> { + let (inp, len) = be_u32(inp)?; + if len < 1 { + use nom::error::{ErrorKind, ParseError}; + use nom::Err; + return IResult::Err(Err::Failure(ParseError::from_error_kind(inp, ErrorKind::Fail))); + } + if inp.len() < len as usize { + let e = nom::Err::Incomplete(nom::Needed::Size(NonZeroUsize::new(len as _).unwrap())); + IResult::Err(e) + } else { + let (inp2, inp) = inp.split_at(len as _); + let (inp2, res) = api1_frame_complete(inp2)?; + if inp2.len() != 0 { + let kind = nom::error::ErrorKind::Fail; + let e = nom::error::Error::new(inp, kind); + IResult::Err(nom::Err::Failure(e)) + } else { + IResult::Ok((inp, res)) + } + } +} + +type Nres<'a, T> = IResult<&'a [u8], T, nom::error::VerboseError<&'a [u8]>>; + +#[allow(unused)] +fn verbose_err(inp: &[u8]) -> Nres { + use nom::error::{ErrorKind, ParseError, VerboseError}; + use nom::Err; + let e = VerboseError::from_error_kind(inp, ErrorKind::Fail); + return IResult::Err(Err::Failure(e)); +} + +pub fn api1_frames(inp: &[u8]) -> IResult<&[u8], Vec> { + let (inp, res) = many0(api1_frame)(inp)?; + IResult::Ok((inp, res)) +} + +#[test] +fn test_basic_frames() -> Result<(), err::Error> { + use std::io::Write; + let mut buf = Vec::new(); + let js = r#"{"name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN"}"#; + buf.write(&(1 + js.as_bytes().len() as u32).to_be_bytes())?; + buf.write(&[0])?; + buf.write(js.as_bytes())?; + + buf.write(&25u32.to_be_bytes())?; + buf.write(&[1])?; + buf.write(&20u64.to_be_bytes())?; + buf.write(&21u64.to_be_bytes())?; + buf.write(&5.123f64.to_be_bytes())?; + + buf.write(&25u32.to_be_bytes())?; + buf.write(&[1])?; + buf.write(&22u64.to_be_bytes())?; + buf.write(&23u64.to_be_bytes())?; + buf.write(&7.88f64.to_be_bytes())?; + + match api1_frames(&buf) { + Ok((_, frames)) => { + assert_eq!(frames.len(), 3); + } + Err(e) => { + error!("can not parse result: {e}"); + panic!() + } + }; + Ok(()) +} diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 37f23ab..eaaa6ba 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -30,7 +30,6 @@ tracing-futures = { version = "0.2.5", features = ["futures-01", "futures-03", " fs2 = "0.4.3" libc = "0.2.93" hex = "0.4.3" -nom = "6.1.2" num-traits = "0.2.14" num-derive = "0.3" url = "2.2.2" diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 95579a5..fb8c946 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -47,7 +47,7 @@ async fn agg_x_dim_0_inner() { array: false, shape: Shape::Scalar, scalar_type: ScalarType::F64, - byte_order: ByteOrder::big_endian(), + byte_order: ByteOrder::Big, compression: true, }, timebin: 18723, @@ -102,7 +102,7 @@ async fn agg_x_dim_1_inner() { array: true, shape: Shape::Wave(1024), scalar_type: ScalarType::F64, - byte_order: ByteOrder::big_endian(), + byte_order: ByteOrder::Big, compression: true, }, timebin: 0, diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index 9714b60..40c28d8 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -185,11 +185,11 @@ where macro_rules! match_end { ($nty:ident, $end:expr, $scalar_type:expr, $shape:expr, $agg_kind:expr, $query:expr, $node_config:expr) => { match $end { - ByteOrder::LE => { + ByteOrder::Little => { make_num_pipeline_nty_end::<$nty, LittleEndian>($scalar_type, $shape, $agg_kind, $query, $node_config) .await } - ByteOrder::BE => { + ByteOrder::Big => { make_num_pipeline_nty_end::<$nty, BigEndian>($scalar_type, $shape, $agg_kind, $query, $node_config) .await } @@ -244,7 +244,7 @@ pub async fn pre_binned_bytes_for_http( let ret = make_num_pipeline( query.scalar_type().clone(), // TODO actually, make_num_pipeline should not depend on endianness. - ByteOrder::LE, + ByteOrder::Little, query.shape().clone(), query.agg_kind().clone(), query.clone(), diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index a86a669..1fb57d6 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -152,8 +152,10 @@ where macro_rules! match_end { ($f:expr, $nty:ident, $end:expr, $scalar_type:expr, $shape:expr, $agg_kind:expr, $node_config:expr) => { match $end { - ByteOrder::LE => channel_exec_nty_end::<_, $nty, _>($f, LittleEndian {}, $scalar_type, $shape, $agg_kind), - ByteOrder::BE => channel_exec_nty_end::<_, $nty, _>($f, BigEndian {}, $scalar_type, $shape, $agg_kind), + ByteOrder::Little => { + channel_exec_nty_end::<_, $nty, _>($f, LittleEndian {}, $scalar_type, $shape, $agg_kind) + } + ByteOrder::Big => channel_exec_nty_end::<_, $nty, _>($f, BigEndian {}, $scalar_type, $shape, $agg_kind), } }; } @@ -201,7 +203,7 @@ where f, scalar_type, // TODO TODO TODO is the byte order ever important here? - ByteOrder::LE, + ByteOrder::Little, shape, agg_kind, node_config, diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index 6707fa0..f599545 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -823,7 +823,7 @@ mod test { keyspace: 2, time_bin_size: Nanos { ns: DAY }, scalar_type: netpod::ScalarType::I32, - byte_order: netpod::ByteOrder::big_endian(), + byte_order: netpod::ByteOrder::Big, shape: netpod::Shape::Scalar, array: false, compression: false, diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index db84fa3..25e6dcb 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -264,7 +264,7 @@ mod test { keyspace: 2, time_bin_size: Nanos { ns: DAY }, scalar_type: netpod::ScalarType::I32, - byte_order: netpod::ByteOrder::big_endian(), + byte_order: netpod::ByteOrder::Big, shape: netpod::Shape::Scalar, array: false, compression: false, diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 3557bae..488da83 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -29,7 +29,7 @@ pub async fn gen_test_data() -> Result<(), Error> { keyspace: 2, time_bin_size: Nanos { ns: DAY }, scalar_type: ScalarType::I32, - byte_order: ByteOrder::big_endian(), + byte_order: ByteOrder::Big, shape: Shape::Scalar, array: false, compression: false, @@ -50,7 +50,7 @@ pub async fn gen_test_data() -> Result<(), Error> { array: true, scalar_type: ScalarType::F64, shape: Shape::Wave(21), - byte_order: ByteOrder::big_endian(), + byte_order: ByteOrder::Big, compression: true, }, gen_var: netpod::GenVar::Default, @@ -67,7 +67,7 @@ pub async fn gen_test_data() -> Result<(), Error> { keyspace: 3, time_bin_size: Nanos { ns: DAY }, scalar_type: ScalarType::U16, - byte_order: ByteOrder::little_endian(), + byte_order: ByteOrder::Little, shape: Shape::Wave(77), array: true, compression: true, @@ -86,7 +86,7 @@ pub async fn gen_test_data() -> Result<(), Error> { keyspace: 2, time_bin_size: Nanos { ns: DAY }, scalar_type: ScalarType::I32, - byte_order: ByteOrder::little_endian(), + byte_order: ByteOrder::Little, shape: Shape::Scalar, array: false, compression: false, @@ -105,7 +105,7 @@ pub async fn gen_test_data() -> Result<(), Error> { keyspace: 2, time_bin_size: Nanos { ns: DAY }, scalar_type: ScalarType::I32, - byte_order: ByteOrder::little_endian(), + byte_order: ByteOrder::Little, shape: Shape::Scalar, array: false, compression: false, diff --git a/disk/src/merge.rs b/disk/src/merge.rs index cd6fe9d..b6ef449 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -360,7 +360,7 @@ mod test { keyspace: 2, time_bin_size: Nanos { ns: DAY }, scalar_type: ScalarType::I32, - byte_order: ByteOrder::BE, + byte_order: ByteOrder::Big, array: false, compression: false, shape: Shape::Scalar, diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 068ec7e..d0326a4 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -112,8 +112,8 @@ macro_rules! pipe3 { macro_rules! pipe2 { ($nty:ident, $end:expr, $shape:expr, $agg_kind:expr, $event_blobs:expr) => { match $end { - ByteOrder::LE => pipe3!($nty, LittleEndian, $shape, $agg_kind, $event_blobs), - ByteOrder::BE => pipe3!($nty, BigEndian, $shape, $agg_kind, $event_blobs), + ByteOrder::Little => pipe3!($nty, LittleEndian, $shape, $agg_kind, $event_blobs), + ByteOrder::Big => pipe3!($nty, BigEndian, $shape, $agg_kind, $event_blobs), } }; } diff --git a/dq/src/bin/dq.rs b/dq/src/bin/dq.rs index 8a7b8f7..a1031ee 100644 --- a/dq/src/bin/dq.rs +++ b/dq/src/bin/dq.rs @@ -83,7 +83,7 @@ pub fn main() -> Result<(), Error> { compression: false, shape: Shape::Scalar, array: false, - byte_order: ByteOrder::LE, + byte_order: ByteOrder::Little, }; let range = NanoRange { beg: u64::MIN, diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 80d71c9..8700d42 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -1,6 +1,6 @@ use crate::err::Error; use crate::gather::{gather_get_json_generic, SubRes}; -use crate::{response, BodyStream}; +use crate::{response, BodyStream, ReqCtx}; use bytes::{BufMut, BytesMut}; use futures_core::Stream; use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; @@ -9,10 +9,10 @@ use hyper::{Body, Client, Request, Response}; use items::eventfull::EventFull; use items::{RangeCompletableItem, Sitemty, StreamItem}; use itertools::Itertools; -use netpod::log::*; use netpod::query::api1::Api1Query; use netpod::query::RawEventsQuery; use netpod::timeunits::SEC; +use netpod::{log::*, ScalarType}; use netpod::{ByteSize, Channel, DiskIoTune, NanoRange, NodeConfigCached, PerfOpts, Shape}; use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig}; use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET}; @@ -21,6 +21,7 @@ use parse::channelconfig::read_local_config; use parse::channelconfig::{Config, ConfigEntry, MatchingConfigEntry}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; +use std::fmt; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; @@ -468,17 +469,127 @@ async fn process_answer(res: Response) -> Result { } } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum Api1ScalarType { + #[serde(rename = "uint8")] + U8, + #[serde(rename = "uint16")] + U16, + #[serde(rename = "uint32")] + U32, + #[serde(rename = "uint64")] + U64, + #[serde(rename = "int8")] + I8, + #[serde(rename = "int16")] + I16, + #[serde(rename = "int32")] + I32, + #[serde(rename = "int64")] + I64, + #[serde(rename = "float32")] + F32, + #[serde(rename = "float64")] + F64, + #[serde(rename = "bool")] + BOOL, + #[serde(rename = "string")] + STRING, +} + +impl Api1ScalarType { + pub fn to_str(&self) -> &'static str { + use Api1ScalarType as A; + match self { + A::U8 => "uint8", + A::U16 => "uint16", + A::U32 => "uint32", + A::U64 => "uint64", + A::I8 => "int8", + A::I16 => "int16", + A::I32 => "int32", + A::I64 => "int64", + A::F32 => "float32", + A::F64 => "float64", + A::BOOL => "bool", + A::STRING => "string", + } + } +} + +impl fmt::Display for Api1ScalarType { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.to_str()) + } +} + +impl From<&ScalarType> for Api1ScalarType { + fn from(k: &ScalarType) -> Self { + use Api1ScalarType as B; + use ScalarType as A; + match k { + A::U8 => B::U8, + A::U16 => B::U16, + A::U32 => B::U32, + A::U64 => B::U64, + A::I8 => B::I8, + A::I16 => B::I16, + A::I32 => B::I32, + A::I64 => B::I64, + A::F32 => B::F32, + A::F64 => B::F64, + A::BOOL => B::BOOL, + A::STRING => B::STRING, + } + } +} + +impl From for Api1ScalarType { + fn from(x: ScalarType) -> Self { + (&x).into() + } +} + +#[test] +fn test_custom_variant_name() { + let val = Api1ScalarType::F32; + assert_eq!(format!("{val:?}"), "F32"); + assert_eq!(format!("{val}"), "float32"); + let s = serde_json::to_string(&val).unwrap(); + assert_eq!(s, "\"float32\""); +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum Api1ByteOrder { + #[serde(rename = "LITTLE_ENDIAN")] + Little, + #[serde(rename = "BIG_ENDIAN")] + Big, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct Api1ChannelHeader { name: String, #[serde(rename = "type")] - ty: String, + ty: Api1ScalarType, #[serde(rename = "byteOrder")] - byte_order: String, + byte_order: Api1ByteOrder, + #[serde(default)] shape: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] compression: Option, } +impl Api1ChannelHeader { + pub fn name(&self) -> &str { + &self.name + } + + pub fn ty(&self) -> Api1ScalarType { + self.ty.clone() + } +} + pub struct DataApiPython3DataStream { range: NanoRange, channels: Vec, @@ -560,11 +671,11 @@ impl DataApiPython3DataStream { if !*header_out { let head = Api1ChannelHeader { name: channel.name.clone(), - ty: b.scalar_types[i1].to_api3proto().into(), + ty: (&b.scalar_types[i1]).into(), byte_order: if b.be[i1] { - "BIG_ENDIAN".into() + Api1ByteOrder::Big } else { - "LITTLE_ENDIAN".into() + Api1ByteOrder::Little }, // The shape is inconsistent on the events. // Seems like the config is to be trusted in this case. @@ -576,39 +687,18 @@ impl DataApiPython3DataStream { let l1 = 1 + h.as_bytes().len() as u32; d.put_u32(l1); d.put_u8(0); + debug!("header frame byte len {}", 4 + 1 + h.as_bytes().len()); d.extend_from_slice(h.as_bytes()); - d.put_u32(l1); *header_out = true; } - { - match &b.shapes[i1] { - Shape::Image(_, _) => { - let l1 = 17 + b.blobs[i1].len() as u32; - d.put_u32(l1); - d.put_u8(1); - d.put_u64(b.tss[i1]); - d.put_u64(b.pulses[i1]); - d.put_slice(&b.blobs[i1]); - d.put_u32(l1); - } - Shape::Wave(_) => { - let l1 = 17 + b.blobs[i1].len() as u32; - d.put_u32(l1); - d.put_u8(1); - d.put_u64(b.tss[i1]); - d.put_u64(b.pulses[i1]); - d.put_slice(&b.blobs[i1]); - d.put_u32(l1); - } - _ => { - let l1 = 17 + b.blobs[i1].len() as u32; - d.put_u32(l1); - d.put_u8(1); - d.put_u64(b.tss[i1]); - d.put_u64(b.pulses[i1]); - d.put_slice(&b.blobs[i1]); - d.put_u32(l1); - } + match &b.shapes[i1] { + _ => { + let l1 = 17 + b.blobs[i1].len() as u32; + d.put_u32(l1); + d.put_u8(1); + d.put_u64(b.tss[i1]); + d.put_u64(b.pulses[i1]); + d.put_slice(&b.blobs[i1]); } } *count_events += 1; @@ -810,7 +900,12 @@ impl Api1EventsBinaryHandler { } } - pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + pub async fn handle( + &self, + req: Request, + _ctx: &ReqCtx, + node_config: &NodeConfigCached, + ) -> Result, Error> { if req.method() != Method::POST { return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?); } diff --git a/httpret/src/bodystream.rs b/httpret/src/bodystream.rs index 8f7d155..f2ed002 100644 --- a/httpret/src/bodystream.rs +++ b/httpret/src/bodystream.rs @@ -13,20 +13,12 @@ use std::task::{Context, Poll}; use tracing::field::Empty; use tracing::{span, Level}; -fn proxy_mark() -> &'static str { - "7c5e408a" -} - pub fn response(status: T) -> http::response::Builder where http::StatusCode: std::convert::TryFrom, >::Error: Into, { - Response::builder() - .status(status) - .header("Access-Control-Allow-Origin", "*") - .header("Access-Control-Allow-Headers", "*") - .header("x-proxy-log-mark", proxy_mark()) + Response::builder().status(status) } pub struct BodyStream { diff --git a/httpret/src/channel_status.rs b/httpret/src/channel_status.rs index 80df0ad..b94d0ed 100644 --- a/httpret/src/channel_status.rs +++ b/httpret/src/channel_status.rs @@ -1,5 +1,6 @@ use crate::bodystream::response; use crate::err::Error; +use crate::ReqCtx; use http::{Method, Request, Response, StatusCode}; use hyper::Body; use netpod::query::ChannelStateEventsQuery; @@ -17,7 +18,12 @@ impl ConnectionStatusEvents { } } - pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + pub async fn handle( + &self, + req: Request, + _ctx: &ReqCtx, + node_config: &NodeConfigCached, + ) -> Result, Error> { if req.method() == Method::GET { let accept_def = APP_JSON; let accept = req @@ -70,7 +76,12 @@ impl ChannelConnectionStatusEvents { } } - pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + pub async fn handle( + &self, + req: Request, + _ctx: &ReqCtx, + node_config: &NodeConfigCached, + ) -> Result, Error> { if req.method() == Method::GET { let accept_def = APP_JSON; let accept = req diff --git a/httpret/src/events.rs b/httpret/src/events.rs index 0e27725..cf5c3f8 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -1,6 +1,6 @@ use crate::channelconfig::{chconf_from_events_binary, chconf_from_events_json}; use crate::err::Error; -use crate::{response, response_err, BodyStream, ToPublicResponse}; +use crate::{response, response_err, BodyStream, ReqCtx, ToPublicResponse}; use futures_util::{Stream, StreamExt, TryStreamExt}; use http::{Method, Request, Response, StatusCode}; use hyper::Body; @@ -164,17 +164,27 @@ impl EventsHandlerScylla { } } - pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + pub async fn handle( + &self, + req: Request, + ctx: &ReqCtx, + node_config: &NodeConfigCached, + ) -> Result, Error> { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } - match self.fetch(req, node_config).await { + match self.fetch(req, ctx, node_config).await { Ok(ret) => Ok(ret), Err(e) => Ok(e.to_public_response()), } } - async fn fetch(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + async fn fetch( + &self, + req: Request, + ctx: &ReqCtx, + node_config: &NodeConfigCached, + ) -> Result, Error> { info!("EventsHandlerScylla req: {:?}", req); let accept_def = APP_JSON; let accept = req @@ -182,14 +192,19 @@ impl EventsHandlerScylla { .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { - Ok(self.gather(req, node_config).await?) + Ok(self.gather(req, ctx, node_config).await?) } else { let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("Unsupported Accept: {:?}", accept))?; Ok(ret) } } - async fn gather(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + async fn gather( + &self, + req: Request, + _ctx: &ReqCtx, + node_config: &NodeConfigCached, + ) -> Result, Error> { let self_name = std::any::type_name::(); let (head, _body) = req.into_parts(); warn!("TODO PlainEventsQuery needs to take AggKind to do x-binning"); @@ -303,11 +318,16 @@ impl BinnedHandlerScylla { } } - pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + pub async fn handle( + &self, + req: Request, + ctx: &ReqCtx, + node_config: &NodeConfigCached, + ) -> Result, Error> { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } - match self.fetch(req, node_config).await { + match self.fetch(req, ctx, node_config).await { Ok(ret) => Ok(ret), Err(e) => { eprintln!("error: {e}"); @@ -316,7 +336,12 @@ impl BinnedHandlerScylla { } } - async fn fetch(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + async fn fetch( + &self, + req: Request, + ctx: &ReqCtx, + node_config: &NodeConfigCached, + ) -> Result, Error> { info!("BinnedHandlerScylla req: {:?}", req); let accept_def = APP_JSON; let accept = req @@ -324,14 +349,19 @@ impl BinnedHandlerScylla { .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { - Ok(self.gather(req, node_config).await?) + Ok(self.gather(req, ctx, node_config).await?) } else { let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("Unsupported Accept: {:?}", accept))?; Ok(ret) } } - async fn gather(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + async fn gather( + &self, + req: Request, + _ctx: &ReqCtx, + node_config: &NodeConfigCached, + ) -> Result, Error> { let (head, _body) = req.into_parts(); warn!("TODO BinnedQuery needs to take AggKind to do x-binngin"); let s1 = format!("dummy:{}", head.uri); diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 32e6246..4c6f03c 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -30,6 +30,7 @@ use net::SocketAddr; use netpod::log::*; use netpod::query::BinnedQuery; use netpod::timeunits::SEC; +use netpod::ProxyConfig; use netpod::{FromUrl, NodeConfigCached, NodeStatus, NodeStatusArchiverAppliance}; use netpod::{ACCEPT_ALL, APP_JSON, APP_JSON_LINES, APP_OCTET}; use nodenet::conn::events_service; @@ -45,6 +46,8 @@ use task::{Context, Poll}; use tracing::Instrument; use url::Url; +pub const PSI_DAQBUFFER_SERVICE_MARK: &'static str = "PSI-Daqbuffer-Service-Mark"; + pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { static STATUS_BOARD_INIT: Once = Once::new(); STATUS_BOARD_INIT.call_once(|| { @@ -132,6 +135,41 @@ where impl UnwindSafe for Cont {} +pub struct ReqCtx { + pub marks: Vec, + pub mark: String, +} + +impl ReqCtx { + fn with_node(req: &Request, nc: &NodeConfigCached) -> Self { + let mut marks = Vec::new(); + for (n, v) in req.headers().iter() { + if n == PSI_DAQBUFFER_SERVICE_MARK { + marks.push(String::from_utf8_lossy(v.as_bytes()).to_string()); + } + } + Self { + marks, + mark: format!("{}:{}", nc.node_config.name, nc.node.port), + } + } +} + +impl ReqCtx { + fn with_proxy(req: &Request, proxy: &ProxyConfig) -> Self { + let mut marks = Vec::new(); + for (n, v) in req.headers().iter() { + if n == PSI_DAQBUFFER_SERVICE_MARK { + marks.push(String::from_utf8_lossy(v.as_bytes()).to_string()); + } + } + Self { + marks, + mark: format!("{}:{}", proxy.name, proxy.port), + } + } +} + // TODO remove because I want error bodies to be json. pub fn response_err(status: StatusCode, msg: T) -> Result, Error> where @@ -193,11 +231,28 @@ macro_rules! static_http_api1 { } async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + let ctx = ReqCtx::with_node(&req, node_config); + let mut res = http_service_inner(req, &ctx, node_config).await?; + let hm = res.headers_mut(); + hm.append("Access-Control-Allow-Origin", "*".parse().unwrap()); + hm.append("Access-Control-Allow-Headers", "*".parse().unwrap()); + for m in &ctx.marks { + hm.append(PSI_DAQBUFFER_SERVICE_MARK, m.parse().unwrap()); + } + hm.append(PSI_DAQBUFFER_SERVICE_MARK, ctx.mark.parse().unwrap()); + Ok(res) +} + +async fn http_service_inner( + req: Request, + ctx: &ReqCtx, + node_config: &NodeConfigCached, +) -> Result, Error> { let uri = req.uri().clone(); let path = uri.path(); if path == "/api/4/node_status" { if req.method() == Method::GET { - Ok(node_status(req, &node_config).await?) + Ok(node_status(req, ctx, &node_config).await?) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } @@ -245,34 +300,34 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else if let Some(h) = events::EventsHandler::handler(&req) { h.handle(req, &node_config).await } else if let Some(h) = events::EventsHandlerScylla::handler(&req) { - h.handle(req, &node_config).await + h.handle(req, ctx, &node_config).await } else if let Some(h) = events::BinnedHandlerScylla::handler(&req) { - h.handle(req, &node_config).await + h.handle(req, ctx, &node_config).await } else if let Some(h) = channel_status::ConnectionStatusEvents::handler(&req) { - h.handle(req, &node_config).await + h.handle(req, ctx, &node_config).await } else if let Some(h) = channel_status::ChannelConnectionStatusEvents::handler(&req) { - h.handle(req, &node_config).await + h.handle(req, ctx, &node_config).await } else if path == "/api/4/binned" { if req.method() == Method::GET { - Ok(binned(req, node_config).await?) + Ok(binned(req, ctx, node_config).await?) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } } else if path == "/api/4/prebinned" { if req.method() == Method::GET { - Ok(prebinned(req, &node_config).await?) + Ok(prebinned(req, ctx, &node_config).await?) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } } else if path == "/api/4/table_sizes" { if req.method() == Method::GET { - Ok(table_sizes(req, &node_config).await?) + Ok(table_sizes(req, ctx, &node_config).await?) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } } else if path == "/api/4/random/channel" { if req.method() == Method::GET { - Ok(random_channel(req, &node_config).await?) + Ok(random_channel(req, ctx, &node_config).await?) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } @@ -284,25 +339,25 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } } else if path == "/api/4/clear_cache" { if req.method() == Method::GET { - Ok(clear_cache_all(req, &node_config).await?) + Ok(clear_cache_all(req, ctx, &node_config).await?) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } } else if path == "/api/4/update_db_with_channel_names" { if req.method() == Method::GET { - Ok(update_db_with_channel_names(req, &node_config).await?) + Ok(update_db_with_channel_names(req, ctx, &node_config).await?) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } } else if path == "/api/4/update_db_with_all_channel_configs" { if req.method() == Method::GET { - Ok(update_db_with_all_channel_configs(req, &node_config).await?) + Ok(update_db_with_all_channel_configs(req, ctx, &node_config).await?) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } } else if path == "/api/4/update_search_cache" { if req.method() == Method::GET { - Ok(update_search_cache(req, &node_config).await?) + Ok(update_search_cache(req, ctx, &node_config).await?) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } @@ -311,7 +366,7 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else if let Some(h) = settings::SettingsThreadsMaxHandler::handler(&req) { h.handle(req, &node_config).await } else if let Some(h) = api1::Api1EventsBinaryHandler::handler(&req) { - h.handle(req, &node_config).await + h.handle(req, ctx, &node_config).await } else if let Some(h) = evinfo::EventInfoScan::handler(&req) { h.handle(req, &node_config).await } else if let Some(h) = pulsemap::MapPulseScyllaHandler::handler(&req) { @@ -387,8 +442,8 @@ impl StatusBoardAllHandler { } } -async fn binned(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - match binned_inner(req, node_config).await { +async fn binned(req: Request, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result, Error> { + match binned_inner(req, ctx, node_config).await { Ok(ret) => Ok(ret), Err(e) => { error!("fn binned: {e:?}"); @@ -397,7 +452,11 @@ async fn binned(req: Request, node_config: &NodeConfigCached) -> Result, node_config: &NodeConfigCached) -> Result, Error> { +async fn binned_inner( + req: Request, + ctx: &ReqCtx, + node_config: &NodeConfigCached, +) -> Result, Error> { let (head, _body) = req.into_parts(); let url = Url::parse(&format!("dummy:{}", head.uri))?; let query = BinnedQuery::from_url(&url).map_err(|e| { @@ -416,8 +475,8 @@ async fn binned_inner(req: Request, node_config: &NodeConfigCached) -> Res debug!("binned STARTING {:?}", query); }); match head.headers.get(http::header::ACCEPT) { - Some(v) if v == APP_OCTET => binned_binary(query, chconf, node_config).await, - Some(v) if v == APP_JSON || v == ACCEPT_ALL => binned_json(query, chconf, node_config).await, + Some(v) if v == APP_OCTET => binned_binary(query, chconf, &ctx, node_config).await, + Some(v) if v == APP_JSON || v == ACCEPT_ALL => binned_json(query, chconf, &ctx, node_config).await, _ => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?), } } @@ -425,6 +484,7 @@ async fn binned_inner(req: Request, node_config: &NodeConfigCached) -> Res async fn binned_binary( query: BinnedQuery, chconf: ChConf, + _ctx: &ReqCtx, node_config: &NodeConfigCached, ) -> Result, Error> { let body_stream = @@ -439,6 +499,7 @@ async fn binned_binary( async fn binned_json( query: BinnedQuery, chconf: ChConf, + _ctx: &ReqCtx, node_config: &NodeConfigCached, ) -> Result, Error> { let body_stream = disk::binned::binned_json(&query, chconf.scalar_type, chconf.shape, node_config).await?; @@ -449,8 +510,8 @@ async fn binned_json( Ok(res) } -async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - match prebinned_inner(req, node_config).await { +async fn prebinned(req: Request, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result, Error> { + match prebinned_inner(req, ctx, node_config).await { Ok(ret) => Ok(ret), Err(e) => { error!("fn prebinned: {e:?}"); @@ -459,7 +520,11 @@ async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result } } -async fn prebinned_inner(req: Request, node_config: &NodeConfigCached) -> Result, Error> { +async fn prebinned_inner( + req: Request, + _ctx: &ReqCtx, + node_config: &NodeConfigCached, +) -> Result, Error> { let (head, _body) = req.into_parts(); let query = PreBinnedQuery::from_request(&head)?; let desc = format!( @@ -486,7 +551,11 @@ async fn prebinned_inner(req: Request, node_config: &NodeConfigCached) -> Ok(ret) } -async fn node_status(req: Request, node_config: &NodeConfigCached) -> Result, Error> { +async fn node_status( + req: Request, + _ctx: &ReqCtx, + node_config: &NodeConfigCached, +) -> Result, Error> { let (_head, _body) = req.into_parts(); let archiver_appliance_status = match node_config.node.archiver_appliance.as_ref() { Some(k) => { @@ -525,7 +594,11 @@ async fn node_status(req: Request, node_config: &NodeConfigCached) -> Resu Ok(ret) } -async fn table_sizes(req: Request, node_config: &NodeConfigCached) -> Result, Error> { +async fn table_sizes( + req: Request, + _ctx: &ReqCtx, + node_config: &NodeConfigCached, +) -> Result, Error> { let (_head, _body) = req.into_parts(); let sizes = dbconn::table_sizes(node_config).await?; let mut ret = String::new(); @@ -537,14 +610,22 @@ async fn table_sizes(req: Request, node_config: &NodeConfigCached) -> Resu Ok(ret) } -pub async fn random_channel(req: Request, node_config: &NodeConfigCached) -> Result, Error> { +pub async fn random_channel( + req: Request, + _ctx: &ReqCtx, + node_config: &NodeConfigCached, +) -> Result, Error> { let (_head, _body) = req.into_parts(); let ret = dbconn::random_channel(node_config).await?; let ret = response(StatusCode::OK).body(Body::from(ret))?; Ok(ret) } -pub async fn clear_cache_all(req: Request, node_config: &NodeConfigCached) -> Result, Error> { +pub async fn clear_cache_all( + req: Request, + _ctx: &ReqCtx, + node_config: &NodeConfigCached, +) -> Result, Error> { let (head, _body) = req.into_parts(); let dry = match head.uri.query() { Some(q) => q.contains("dry"), @@ -559,6 +640,7 @@ pub async fn clear_cache_all(req: Request, node_config: &NodeConfigCached) pub async fn update_db_with_channel_names( req: Request, + _ctx: &ReqCtx, node_config: &NodeConfigCached, ) -> Result, Error> { let (head, _body) = req.into_parts(); @@ -594,6 +676,7 @@ pub async fn update_db_with_channel_names( pub async fn update_db_with_channel_names_3( req: Request, + _ctx: &ReqCtx, node_config: &NodeConfigCached, ) -> Result, Error> { let (head, _body) = req.into_parts(); @@ -616,6 +699,7 @@ pub async fn update_db_with_channel_names_3( pub async fn update_db_with_all_channel_configs( req: Request, + _ctx: &ReqCtx, node_config: &NodeConfigCached, ) -> Result, Error> { let (head, _body) = req.into_parts(); @@ -636,7 +720,11 @@ pub async fn update_db_with_all_channel_configs( Ok(ret) } -pub async fn update_search_cache(req: Request, node_config: &NodeConfigCached) -> Result, Error> { +pub async fn update_search_cache( + req: Request, + _ctx: &ReqCtx, + node_config: &NodeConfigCached, +) -> Result, Error> { let (head, _body) = req.into_parts(); let _dry = match head.uri.query() { Some(q) => q.contains("dry"), diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index eac3da3..4e6de32 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -4,7 +4,7 @@ use crate::api1::{channel_search_configs_v1, channel_search_list_v1, gather_json use crate::err::Error; use crate::gather::{gather_get_json_generic, SubRes}; use crate::pulsemap::MapPulseQuery; -use crate::{api_1_docs, api_4_docs, response, response_err, Cont}; +use crate::{api_1_docs, api_4_docs, response, response_err, Cont, ReqCtx, PSI_DAQBUFFER_SERVICE_MARK}; use futures_core::Stream; use futures_util::pin_mut; use http::{Method, StatusCode}; @@ -14,10 +14,9 @@ use hyper_tls::HttpsConnector; use itertools::Itertools; use netpod::log::*; use netpod::query::{BinnedQuery, PlainEventsQuery}; -use netpod::{ - AppendToUrl, ChannelConfigQuery, ChannelSearchQuery, ChannelSearchResult, ChannelSearchSingleResult, FromUrl, - HasBackend, HasTimeout, ProxyConfig, ACCEPT_ALL, APP_JSON, -}; +use netpod::{AppendToUrl, ChannelConfigQuery, FromUrl, HasBackend, HasTimeout, ProxyConfig}; +use netpod::{ChannelSearchQuery, ChannelSearchResult, ChannelSearchSingleResult}; +use netpod::{ACCEPT_ALL, APP_JSON}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use std::future::Future; @@ -70,6 +69,23 @@ async fn proxy_http_service(req: Request, proxy_config: ProxyConfig) -> Re } async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) -> Result, Error> { + let ctx = ReqCtx::with_proxy(&req, proxy_config); + let mut res = proxy_http_service_inner(req, &ctx, proxy_config).await?; + let hm = res.headers_mut(); + hm.insert("Access-Control-Allow-Origin", "*".parse().unwrap()); + hm.insert("Access-Control-Allow-Headers", "*".parse().unwrap()); + for m in &ctx.marks { + hm.append(PSI_DAQBUFFER_SERVICE_MARK, m.parse().unwrap()); + } + hm.append(PSI_DAQBUFFER_SERVICE_MARK, ctx.mark.parse().unwrap()); + Ok(res) +} + +async fn proxy_http_service_inner( + req: Request, + ctx: &ReqCtx, + proxy_config: &ProxyConfig, +) -> Result, Error> { let uri = req.uri().clone(); let path = uri.path(); if path == "/api/1/channels" { @@ -84,16 +100,15 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) Ok(proxy_api1_single_backend_query(req, proxy_config).await?) } else if path.starts_with("/api/1/map/pulse/") { warn!("/api/1/map/pulse/ DEPRECATED"); - Ok(proxy_api1_map_pulse(req, proxy_config).await?) + Ok(proxy_api1_map_pulse(req, ctx, proxy_config).await?) } else if path.starts_with("/api/1/gather/") { Ok(gather_json_2_v1(req, "/api/1/gather/", proxy_config).await?) } else if path == "/api/4/version" { if req.method() == Method::GET { let ret = serde_json::json!({ - //"data_api_version": "4.0.0-beta", "data_api_version": { "major": 4, - "minor": 0, + "minor": 1, }, }); Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?) @@ -107,13 +122,13 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) } else if path == "/api/4/search/channel" { Ok(api4::channel_search(req, proxy_config).await?) } else if path == "/api/4/events" { - Ok(proxy_single_backend_query::(req, proxy_config).await?) + Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) } else if path.starts_with("/api/4/map/pulse/") { - Ok(proxy_single_backend_query::(req, proxy_config).await?) + Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) } else if path == "/api/4/binned" { - Ok(proxy_single_backend_query::(req, proxy_config).await?) + Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) } else if path == "/api/4/channel/config" { - Ok(proxy_single_backend_query::(req, proxy_config).await?) + Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) } else if path.starts_with("/api/1/documentation/") { if req.method() == Method::GET { api_1_docs(path) @@ -149,12 +164,18 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) } else if path.starts_with(DISTRI_PRE) { proxy_distribute_v2(req).await } else { - Ok(response(StatusCode::NOT_FOUND).body(Body::from(format!( - "Sorry, proxy can not find: {:?} {:?} {:?}", - req.method(), - req.uri().path(), - req.uri().query(), - )))?) + use std::fmt::Write; + let mut body = String::new(); + let out = &mut body; + write!(out, "METHOD {:?}
\n", req.method())?; + write!(out, "URI {:?}
\n", req.uri())?; + write!(out, "HOST {:?}
\n", req.uri().host())?; + write!(out, "PORT {:?}
\n", req.uri().port())?; + write!(out, "PATH {:?}
\n", req.uri().path())?; + for (hn, hv) in req.headers() { + write!(out, "HEADER {hn:?}: {hv:?}
\n")?; + } + Ok(response(StatusCode::NOT_FOUND).body(Body::from(body))?) } } @@ -381,7 +402,11 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R } } -pub async fn proxy_api1_map_pulse(req: Request, proxy_config: &ProxyConfig) -> Result, Error> { +pub async fn proxy_api1_map_pulse( + req: Request, + _ctx: &ReqCtx, + proxy_config: &ProxyConfig, +) -> Result, Error> { let s2 = format!("http://dummy/{}", req.uri()); info!("s2: {:?}", s2); let url = Url::parse(&s2)?; @@ -505,6 +530,7 @@ pub async fn proxy_api1_single_backend_query( pub async fn proxy_single_backend_query( req: Request, + _ctx: &ReqCtx, proxy_config: &ProxyConfig, ) -> Result, Error> where diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 89a40db..6b3b321 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -303,23 +303,6 @@ impl ScalarType { } } - pub fn to_api3proto(&self) -> &'static str { - match self { - ScalarType::U8 => "uint8", - ScalarType::U16 => "uint16", - ScalarType::U32 => "uint32", - ScalarType::U64 => "uint64", - ScalarType::I8 => "int8", - ScalarType::I16 => "int16", - ScalarType::I32 => "int32", - ScalarType::I64 => "int64", - ScalarType::F32 => "float32", - ScalarType::F64 => "float64", - ScalarType::BOOL => "bool", - ScalarType::STRING => "string", - } - } - pub fn to_scylla_i32(&self) -> i32 { self.index() as i32 } @@ -725,31 +708,23 @@ impl NanoRange { #[derive(Clone, Debug, Serialize, Deserialize)] pub enum ByteOrder { - LE, - BE, + Little, + Big, } impl ByteOrder { - pub fn little_endian() -> Self { - Self::LE - } - - pub fn big_endian() -> Self { - Self::BE - } - pub fn from_dtype_flags(flags: u8) -> Self { if flags & 0x20 == 0 { - Self::LE + Self::Little } else { - Self::BE + Self::Big } } pub fn from_bsread_str(s: &str) -> Result { match s { - "little" => Ok(ByteOrder::LE), - "big" => Ok(ByteOrder::BE), + "little" => Ok(ByteOrder::Little), + "big" => Ok(ByteOrder::Big), _ => Err(Error::with_msg_no_trace(format!( "ByteOrder::from_bsread_str can not understand {}", s @@ -758,7 +733,7 @@ impl ByteOrder { } pub fn is_le(&self) -> bool { - if let Self::LE = self { + if let Self::Little = self { true } else { false @@ -766,7 +741,7 @@ impl ByteOrder { } pub fn is_be(&self) -> bool { - if let Self::BE = self { + if let Self::Big = self { true } else { false diff --git a/parse/Cargo.toml b/parse/Cargo.toml index 7ba34f6..ec874ed 100644 --- a/parse/Cargo.toml +++ b/parse/Cargo.toml @@ -6,12 +6,13 @@ edition = "2021" [dependencies] serde = { version = "1.0", features = ["derive"] } -tokio = { version = "1.18.1" } +serde_json = "1.0.89" +tokio = { version = "1.22.0", features = ["fs"] } chrono = { version = "0.4.19", features = ["serde"] } bytes = "1.0.1" byteorder = "1.4.3" hex = "0.4.3" -nom = "6.1.2" +nom = "7.1.1" num-traits = "0.2" num-derive = "0.3" err = { path = "../err" } diff --git a/parse/src/channelconfig.rs b/parse/src/channelconfig.rs index 4f23034..10a110b 100644 --- a/parse/src/channelconfig.rs +++ b/parse/src/channelconfig.rs @@ -1,14 +1,10 @@ use err::Error; use netpod::timeunits::MS; -use netpod::{ - ByteOrder, Channel, ChannelConfigQuery, ChannelConfigResponse, NanoRange, Nanos, Node, ScalarType, Shape, -}; +use netpod::{ByteOrder, Channel, NanoRange, Nanos, Node, ScalarType, Shape}; +use netpod::{ChannelConfigQuery, ChannelConfigResponse}; use nom::bytes::complete::take; use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8}; use nom::Needed; -//use nom::bytes::complete::{tag, take_while_m_n}; -//use nom::combinator::map_res; -//use nom::sequence::tuple; use num_derive::{FromPrimitive, ToPrimitive}; use num_traits::ToPrimitive; use serde::{Deserialize, Serialize}; diff --git a/parse/src/jsonconf.rs b/parse/src/jsonconf.rs new file mode 100644 index 0000000..2e74e7a --- /dev/null +++ b/parse/src/jsonconf.rs @@ -0,0 +1,32 @@ +#[test] +fn test_json_trailing() { + use serde::Deserialize; + use serde_json::Value as JsonValue; + use std::io::Cursor; + if serde_json::from_str::(r#"{}."#).is_ok() { + panic!("Should fail because of trailing character"); + } + let cur = Cursor::new(r#"{}..."#); + let mut de = serde_json::Deserializer::from_reader(cur); + if JsonValue::deserialize(&mut de).is_err() { + panic!("Should allow trailing characters") + } + let cur = Cursor::new(r#"nullA"#); + let mut de = serde_json::Deserializer::from_reader(cur); + if let Ok(val) = JsonValue::deserialize(&mut de) { + if val != serde_json::json!(null) { + panic!("Bad parse") + } + } else { + panic!("Should allow trailing characters") + } + let cur = Cursor::new(r#" {}AA"#); + let mut de = serde_json::Deserializer::from_reader(cur); + if let Ok(val) = JsonValue::deserialize(&mut de) { + if val != serde_json::json!({}) { + panic!("Bad parse") + } + } else { + panic!("Should allow trailing characters") + } +} diff --git a/parse/src/lib.rs b/parse/src/lib.rs index 9c63800..7244ccf 100644 --- a/parse/src/lib.rs +++ b/parse/src/lib.rs @@ -1 +1,2 @@ pub mod channelconfig; +mod jsonconf;