diff --git a/daqbufp2/src/test/api1.rs b/daqbufp2/src/test/api1.rs index c63dcc9..7e2fbab 100644 --- a/daqbufp2/src/test/api1.rs +++ b/daqbufp2/src/test/api1.rs @@ -1,12 +1,13 @@ use crate::err::ErrConv; use crate::nodes::require_test_hosts_running; +use chrono::{DateTime, Utc}; use err::Error; use futures_util::Future; use http::{header, Request, StatusCode}; use httpclient::{http_get, http_post}; use hyper::Body; use netpod::log::*; -use netpod::query::api1::{Api1Query, Api1Range}; +use netpod::query::api1::{Api1Query, Api1Range, ChannelTuple}; use url::Url; fn testrun(fut: F) -> Result @@ -23,11 +24,18 @@ fn events_f64_plain() -> Result<(), Error> { let cluster = &rh.cluster; let node = &cluster.nodes[0]; let url: Url = format!("http://{}:{}/api/1/query", node.host, node.port).parse()?; - let accept = "application/json"; - //let qu = Api1Query::new(Api1Range::new(), vec!["testbackend/scalar-i32-be"]); - let buf = http_post(url, accept, "{}".into()).await?; - let js = String::from_utf8_lossy(&buf); - eprintln!("string received: {js}"); + let accept = "application/octet-stream"; + let beg: DateTime = "1970-01-01T00:00:00Z".parse()?; + let end: DateTime = "1970-01-01T00:01:00Z".parse()?; + let range = Api1Range::new(beg, end)?; + // TODO the channel list needs to get pre-processed to check for backend prefix! + let ch = ChannelTuple::new("test-disk-databuffer".into(), "scalar-i32-be".into()); + let qu = Api1Query::new(range, vec![ch]); + let body = serde_json::to_string(&qu)?; + let buf = http_post(url, accept, body.into()).await?; + eprintln!("body received: {}", buf.len()); + //let js = String::from_utf8_lossy(&buf); + //eprintln!("string received: {js}"); Ok(()) }; testrun(fut)?; diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs index 1f718a2..c576482 100644 --- a/daqbufp2/src/test/events.rs +++ b/daqbufp2/src/test/events.rs @@ -23,7 +23,7 @@ use url::Url; fn ch_adhoc(name: &str) -> Channel { Channel { series: None, - backend: "testbackend".into(), + backend: "test-disk-databuffer".into(), name: name.into(), } } @@ -31,7 +31,7 @@ fn ch_adhoc(name: &str) -> Channel { pub fn ch_gen(name: &str) -> Channel { Channel { series: None, - backend: "testbackend".into(), + backend: "test-disk-databuffer".into(), name: name.into(), } } diff --git a/daqbufp2/src/test/eventsjson.rs b/daqbufp2/src/test/eventsjson.rs index 0f40f43..205aa0f 100644 --- a/daqbufp2/src/test/eventsjson.rs +++ b/daqbufp2/src/test/eventsjson.rs @@ -18,7 +18,7 @@ fn events_plain_json_00() -> Result<(), Error> { let cluster = &rh.cluster; events_plain_json( Channel { - backend: "testbackend".into(), + backend: "test-inmem".into(), name: "inmem-d0-i32".into(), series: None, }, diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index e05e6e1..6707fa0 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -813,7 +813,7 @@ mod test { end: DAY + HOUR * 8, }; let chn = netpod::Channel { - backend: "testbackend".into(), + backend: "test-disk-databuffer".into(), name: "scalar-i32-be".into(), series: None, }; diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 30cbbed..db84fa3 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -254,7 +254,7 @@ mod test { fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, Vec), Error> { let chn = netpod::Channel { - backend: "testbackend".into(), + backend: "test-disk-databuffer".into(), name: "scalar-i32-be".into(), series: None, }; diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 665dfff..3557bae 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -10,6 +10,7 @@ use tokio::fs::{File, OpenOptions}; use tokio::io::AsyncWriteExt; pub async fn gen_test_data() -> Result<(), Error> { + let backend = String::from("test-disk-databuffer"); let homedir = std::env::var("HOME").unwrap(); let data_base_path = PathBuf::from(homedir).join("daqbuffer-testdata").join("databuffer"); let ksprefix = String::from("ks"); @@ -21,7 +22,7 @@ pub async fn gen_test_data() -> Result<(), Error> { let chn = ChannelGenProps { config: ChannelConfig { channel: Channel { - backend: "testbackend".into(), + backend: backend.clone(), name: "scalar-i32-be".into(), series: None, }, @@ -40,7 +41,7 @@ pub async fn gen_test_data() -> Result<(), Error> { let chn = ChannelGenProps { config: ChannelConfig { channel: Channel { - backend: "testbackend".into(), + backend: backend.clone(), name: "wave-f64-be-n21".into(), series: None, }, @@ -59,7 +60,7 @@ pub async fn gen_test_data() -> Result<(), Error> { let chn = ChannelGenProps { config: ChannelConfig { channel: Channel { - backend: "testbackend".into(), + backend: backend.clone(), name: "wave-u16-le-n77".into(), series: None, }, @@ -78,7 +79,7 @@ pub async fn gen_test_data() -> Result<(), Error> { let chn = ChannelGenProps { config: ChannelConfig { channel: Channel { - backend: "testbackend".into(), + backend: backend.clone(), name: "tw-scalar-i32-be".into(), series: None, }, @@ -97,7 +98,7 @@ pub async fn gen_test_data() -> Result<(), Error> { let chn = ChannelGenProps { config: ChannelConfig { channel: Channel { - backend: "testbackend".into(), + backend: backend.clone(), name: "const-regular-scalar-i32-be".into(), series: None, }, diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 83413a7..80d71c9 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -875,9 +875,9 @@ impl Api1EventsBinaryHandler { let chans = qu .channels() .iter() - .map(|x| Channel { + .map(|ch| Channel { backend: backend.into(), - name: x.clone(), + name: ch.name().into(), series: None, }) .collect(); @@ -888,7 +888,7 @@ impl Api1EventsBinaryHandler { chans, qu.disk_io_tune().clone(), qu.decompress(), - qu.events_max(), + qu.events_max().unwrap_or(u64::MAX), status_id.clone(), node_config.clone(), ); diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index dd1dba3..038f80d 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -42,7 +42,7 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) -> channel.backend, ncc.node_config.cluster.backend ); } - if channel.backend() == "testbackend" { + if channel.backend() == "test-inmem" { if channel.name() == "inmem-d0-i32" { let ret = ChConf { series: 1, diff --git a/netpod/Cargo.toml b/netpod/Cargo.toml index 9c9dbb5..19aa77d 100644 --- a/netpod/Cargo.toml +++ b/netpod/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "netpod" -version = "0.0.1-a.0" +version = "0.0.2" authors = ["Dominik Werder "] edition = "2021" @@ -11,12 +11,10 @@ path = "src/netpod.rs" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" async-channel = "1.6" -bytes = "1.0.1" +bytes = "1.3" chrono = { version = "0.4.19", features = ["serde"] } -futures-core = "0.3.12" futures-util = "0.3.14" -tracing = "0.1.25" +tracing = "0.1.37" url = "2.2" -lazy_static = "1.4.0" num-traits = "0.2" err = { path = "../err" } diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index f3e6241..fac071b 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -3,10 +3,11 @@ pub mod query; pub mod status; pub mod streamext; +use crate::log::*; +use bytes::Bytes; use chrono::{DateTime, TimeZone, Utc}; use err::Error; -use futures_core::Stream; -use futures_util::StreamExt; +use futures_util::{Stream, StreamExt}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsVal; use std::collections::BTreeMap; @@ -19,8 +20,6 @@ use std::str::FromStr; use std::task::{Context, Poll}; use std::time::Duration; use timeunits::*; -#[allow(unused_imports)] -use tracing::{debug, error, info, trace, warn}; use url::Url; pub const APP_JSON: &'static str = "application/json"; @@ -37,8 +36,8 @@ pub struct AggQuerySingleChannel { } pub struct BodyStream { - //pub receiver: async_channel::Receiver>, - pub inner: Box> + Send + Unpin>, + //pub receiver: async_channel::Receiver>, + pub inner: Box> + Send + Unpin>, } #[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] @@ -1748,7 +1747,7 @@ impl ByteSize { } } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct FileIoBufferSize(pub usize); impl FileIoBufferSize { diff --git a/netpod/src/query/api1.rs b/netpod/src/query/api1.rs index 2e36de2..537f4bc 100644 --- a/netpod/src/query/api1.rs +++ b/netpod/src/query/api1.rs @@ -1,17 +1,18 @@ use crate::{DiskIoTune, FileIoBufferSize, ReadSys}; -use chrono::{DateTime, FixedOffset, NaiveDate, TimeZone}; +use chrono::{DateTime, FixedOffset}; +use err::Error; use serde::{Deserialize, Serialize}; use std::fmt; -fn u64_max() -> u64 { - u64::MAX +fn bool_true() -> bool { + true } -fn is_u64_max(x: &u64) -> bool { - *x == u64::MAX +fn bool_is_true(x: &bool) -> bool { + *x } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Serialize, Deserialize)] pub struct Api1Range { #[serde(rename = "type", default, skip_serializing_if = "String::is_empty")] ty: String, @@ -30,14 +31,27 @@ pub struct Api1Range { } mod datetime_serde { - // RFC 3339 / ISO 8601 - use super::*; use serde::de::Visitor; use serde::{Deserializer, Serializer}; + // RFC 3339 / ISO 8601 + pub fn ser(val: &DateTime, ser: S) -> Result { - let s = val.format("%Y-%m-%dT%H:%M:%S.%6f%:z").to_string(); + use fmt::Write; + let mut s = String::with_capacity(64); + write!(&mut s, "{}", val.format("%Y-%m-%dT%H:%M:%S")).map_err(|_| serde::ser::Error::custom("fmt"))?; + let mus = val.timestamp_subsec_micros(); + if mus % 1000 != 0 { + write!(&mut s, "{}", val.format(".%6f")).map_err(|_| serde::ser::Error::custom("fmt"))?; + } else if mus != 0 { + write!(&mut s, "{}", val.format(".%3f")).map_err(|_| serde::ser::Error::custom("fmt"))?; + } + if val.offset().local_minus_utc() == 0 { + write!(&mut s, "Z").map_err(|_| serde::ser::Error::custom("fmt"))?; + } else { + write!(&mut s, "{}", val.format("%:z")).map_err(|_| serde::ser::Error::custom("fmt"))?; + } ser.serialize_str(&s) } @@ -72,16 +86,19 @@ mod datetime_serde { } impl Api1Range { - pub fn new(beg: A, end: B) -> Self + pub fn new(beg: A, end: B) -> Result where - A: Into>, - B: Into>, + A: TryInto>, + B: TryInto>, + >>::Error: fmt::Debug, + >>::Error: fmt::Debug, { - Self { + let ret = Self { ty: String::new(), - beg: beg.into(), - end: end.into(), - } + beg: beg.try_into().map_err(|e| format!("{e:?}"))?, + end: end.try_into().map_err(|e| format!("{e:?}"))?, + }; + Ok(ret) } pub fn beg(&self) -> &DateTime { @@ -115,6 +132,7 @@ fn serde_de_range_offset() { #[test] fn serde_ser_range_offset() { + use chrono::{NaiveDate, TimeZone}; let beg = FixedOffset::east_opt(60 * 60 * 3) .unwrap() .from_local_datetime( @@ -135,39 +153,181 @@ fn serde_ser_range_offset() { ) .earliest() .unwrap(); - let range = Api1Range::new(beg, end); + let range = Api1Range::new(beg, end).unwrap(); let js = serde_json::to_string(&range).unwrap(); - let exp = r#"{"startDate":"2022-11-22T13:14:15.016000+03:00","endDate":"2022-11-22T13:14:15.800000-01:00"}"#; + let exp = r#"{"startDate":"2022-11-22T13:14:15.016+03:00","endDate":"2022-11-22T13:14:15.800-01:00"}"#; assert_eq!(js, exp); } -#[derive(Debug, Serialize, Deserialize)] +#[test] +fn serde_ser_range_01() { + let beg: DateTime = "2022-11-22T02:03:04Z".parse().unwrap(); + let end: DateTime = "2022-11-22T02:03:04.123Z".parse().unwrap(); + let range = Api1Range::new(beg, end).unwrap(); + let js = serde_json::to_string(&range).unwrap(); + let exp = r#"{"startDate":"2022-11-22T02:03:04Z","endDate":"2022-11-22T02:03:04.123Z"}"#; + assert_eq!(js, exp); +} + +#[test] +fn serde_ser_range_02() { + let beg: DateTime = "2022-11-22T02:03:04.987654Z".parse().unwrap(); + let end: DateTime = "2022-11-22T02:03:04.777000Z".parse().unwrap(); + let range = Api1Range::new(beg, end).unwrap(); + let js = serde_json::to_string(&range).unwrap(); + let exp = r#"{"startDate":"2022-11-22T02:03:04.987654Z","endDate":"2022-11-22T02:03:04.777Z"}"#; + assert_eq!(js, exp); +} + +/// In Api1, the list of channels consists of either `BACKEND/CHANNELNAME` +/// or just `CHANNELNAME`. +#[derive(Debug, PartialEq)] +pub struct ChannelTuple { + backend: Option, + name: String, +} + +impl ChannelTuple { + pub fn new(backend: String, name: String) -> Self { + Self { + backend: Some(backend), + name, + } + } + + pub fn from_name(name: String) -> Self { + Self { backend: None, name } + } + + pub fn backend(&self) -> Option<&String> { + self.backend.as_ref() + } + + pub fn name(&self) -> &str { + &self.name + } +} + +mod serde_channel_tuple { + use super::*; + use serde::de::{Deserialize, Deserializer, Visitor}; + use serde::ser::{Serialize, Serializer}; + + impl Serialize for ChannelTuple { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + if let Some(backend) = self.backend.as_ref() { + serializer.serialize_str(&format!("{}/{}", backend, self.name)) + } else { + serializer.serialize_str(&self.name) + } + } + } + + struct Vis; + + impl<'de> Visitor<'de> for Vis { + type Value = ChannelTuple; + + fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "[Backendname/]Channelname") + } + + fn visit_str(self, val: &str) -> Result + where + E: serde::de::Error, + { + let mut it = val.split("/"); + // Even empty string splits into one element of empty string + let s0 = it.next().unwrap(); + if let Some(s1) = it.next() { + let ret = ChannelTuple { + backend: Some(s0.into()), + name: s1.into(), + }; + Ok(ret) + } else { + let ret = ChannelTuple { + backend: None, + name: s0.into(), + }; + Ok(ret) + } + } + } + + impl<'de> Deserialize<'de> for ChannelTuple { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + deserializer.deserialize_str(Vis) + } + } + + #[test] + fn ser_name() { + let x = ChannelTuple { + backend: None, + name: "temperature".into(), + }; + let js = serde_json::to_string(&x).unwrap(); + assert_eq!(js, r#""temperature""#); + } + + #[test] + fn ser_backend_name() { + let x = ChannelTuple { + backend: Some("beach".into()), + name: "temperature".into(), + }; + let js = serde_json::to_string(&x).unwrap(); + assert_eq!(js, r#""beach/temperature""#); + } +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] pub struct Api1Query { - channels: Vec, range: Api1Range, + channels: Vec, // All following parameters are private and not to be used - #[serde(default)] + #[serde(default, skip_serializing_if = "Option::is_none")] file_io_buffer_size: Option, - #[serde(default)] + #[serde(default = "bool_true", skip_serializing_if = "bool_is_true")] decompress: bool, - #[serde(default = "u64_max", skip_serializing_if = "is_u64_max")] - events_max: u64, - #[serde(default)] - io_queue_len: u64, - #[serde(default)] + #[serde(default, skip_serializing_if = "Option::is_none")] + events_max: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + io_queue_len: Option, + #[serde(default, skip_serializing_if = "String::is_empty")] log_level: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] read_sys: String, } impl Api1Query { + pub fn new(range: Api1Range, channels: Vec) -> Self { + Self { + range, + channels, + decompress: true, + events_max: None, + file_io_buffer_size: None, + io_queue_len: None, + log_level: String::new(), + read_sys: String::new(), + } + } + pub fn disk_io_tune(&self) -> DiskIoTune { let mut k = DiskIoTune::default(); if let Some(x) = &self.file_io_buffer_size { k.read_buffer_len = x.0; } - if self.io_queue_len != 0 { - k.read_queue_len = self.io_queue_len as usize; + if let Some(io_queue_len) = self.io_queue_len { + k.read_queue_len = io_queue_len as usize; } let read_sys: ReadSys = self.read_sys.as_str().into(); k.read_sys = read_sys; @@ -178,7 +338,7 @@ impl Api1Query { &self.range } - pub fn channels(&self) -> &[String] { + pub fn channels(&self) -> &[ChannelTuple] { &self.channels } @@ -190,7 +350,22 @@ impl Api1Query { self.decompress } - pub fn events_max(&self) -> u64 { + pub fn events_max(&self) -> Option { self.events_max } } + +#[test] +fn serde_api1_query() { + let beg: DateTime = "2022-11-22T08:09:10Z".parse().unwrap(); + let end: DateTime = "2022-11-23T08:11:05.455009+02:00".parse().unwrap(); + let range = Api1Range::new(beg, end).unwrap(); + let ch0 = ChannelTuple::from_name("nameonly".into()); + let ch1 = ChannelTuple::new("somebackend".into(), "somechan".into()); + let qu = Api1Query::new(range, vec![ch0, ch1]); + let js = serde_json::to_string(&qu).unwrap(); + assert_eq!( + js, + r#"{"range":{"startDate":"2022-11-22T08:09:10Z","endDate":"2022-11-23T08:11:05.455009+02:00"},"channels":["nameonly","somebackend/somechan"]}"# + ); +} diff --git a/netpod/src/streamext.rs b/netpod/src/streamext.rs index ae2bef5..f0b3b7b 100644 --- a/netpod/src/streamext.rs +++ b/netpod/src/streamext.rs @@ -1,6 +1,5 @@ use err::Error; -use futures_core::Stream; -use futures_util::StreamExt; +use futures_util::{Stream, StreamExt}; use std::pin::Pin; use std::task::{Context, Poll}; diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 33828a0..bc74fed 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -108,7 +108,7 @@ async fn events_conn_handler_inner_try( } let mut p1: Pin> + Send>> = - if evq.channel().backend() == "testbackend" { + if evq.channel().backend() == "test-inmem" { warn!("TEST BACKEND DATA"); use items_2::Empty; use netpod::timeunits::MS;