From dfff79329e061e9f8b96d5d6eba39629bc2745cd Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 22 Nov 2022 16:33:48 +0100 Subject: [PATCH] Add test case for Api1Range serde --- daqbufp2/Cargo.toml | 8 +- daqbufp2/src/client.rs | 6 +- daqbufp2/src/err.rs | 2 +- daqbufp2/src/nodes.rs | 9 +- daqbufp2/src/test.rs | 10 +- daqbufp2/src/test/api1.rs | 35 ++++++ daqbufp2/src/test/api4.rs | 0 daqbufp2/src/test/binnedbinary.rs | 3 +- daqbufp2/src/test/events.rs | 3 +- disk/src/binned/binnedfrompbv.rs | 3 +- disk/src/cache.rs | 67 ---------- httpclient/Cargo.toml | 27 ++-- httpclient/src/httpclient.rs | 186 ++++++++++++++++++++++++++++ httpclient/src/lib.rs | 44 ------- httpret/src/api1.rs | 80 +++--------- netpod/src/api1.rs | 22 ---- netpod/src/netpod.rs | 1 - netpod/src/query.rs | 7 +- netpod/src/query/api1.rs | 196 ++++++++++++++++++++++++++++++ taskrun/src/taskrun.rs | 6 +- 20 files changed, 484 insertions(+), 231 deletions(-) create mode 100644 daqbufp2/src/test/api1.rs create mode 100644 daqbufp2/src/test/api4.rs create mode 100644 httpclient/src/httpclient.rs delete mode 100644 httpclient/src/lib.rs delete mode 100644 netpod/src/api1.rs create mode 100644 netpod/src/query/api1.rs diff --git a/daqbufp2/Cargo.toml b/daqbufp2/Cargo.toml index 531959e..a8a464a 100644 --- a/daqbufp2/Cargo.toml +++ b/daqbufp2/Cargo.toml @@ -5,13 +5,12 @@ authors = ["Dominik Werder "] edition = "2021" [dependencies] -tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.22.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } hyper = "0.14" http = "0.2" tracing = "0.1.25" -tracing-subscriber = "0.2.17" -futures-core = "0.3.14" -futures-util = "0.3.14" +tracing-subscriber = "0.3.16" +futures-util = "0.3.25" bytes = "1.0.1" serde = "1.0" serde_derive = "1.0" @@ -24,6 +23,7 @@ err = { path = "../err" } taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } httpret = { path = "../httpret" } +httpclient = { path = "../httpclient" } disk = { path = "../disk" } items = { path = "../items" } streams = { path = "../streams" } diff --git a/daqbufp2/src/client.rs b/daqbufp2/src/client.rs index 2ca4471..2f17909 100644 --- a/daqbufp2/src/client.rs +++ b/daqbufp2/src/client.rs @@ -4,11 +4,13 @@ use disk::streamlog::Streamlog; use err::Error; use futures_util::TryStreamExt; use http::StatusCode; +use httpclient::HttpBodyAsAsyncRead; use hyper::Body; use items::xbinnedwaveevents::XBinnedWaveEvents; use items::{Sitemty, StreamItem}; +use netpod::log::*; use netpod::query::{BinnedQuery, CacheUsage}; -use netpod::{log::*, AppendToUrl}; +use netpod::AppendToUrl; use netpod::{AggKind, ByteSize, Channel, HostPort, NanoRange, PerfOpts, APP_OCTET}; use streams::frames::inmem::InMemoryFrameAsyncReadStream; use url::Url; @@ -91,7 +93,7 @@ pub async fn get_binned( ))); } let perf_opts = PerfOpts { inmem_bufcap: 512 }; - let s1 = disk::cache::HttpBodyAsAsyncRead::new(res); + let s1 = HttpBodyAsAsyncRead::new(res); let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap); use futures_util::StreamExt; use std::future::ready; diff --git a/daqbufp2/src/err.rs b/daqbufp2/src/err.rs index f13c35d..8552403 100644 --- a/daqbufp2/src/err.rs +++ b/daqbufp2/src/err.rs @@ -5,7 +5,7 @@ pub trait ErrConv { pub trait Convable: ToString {} impl ErrConv for Result { - fn ec(self) -> Result { + fn ec(self) -> Result { match self { Ok(x) => Ok(x), Err(e) => Err(::err::Error::from_string(e.to_string())), diff --git a/daqbufp2/src/nodes.rs b/daqbufp2/src/nodes.rs index 8a50810..8b3c98e 100644 --- a/daqbufp2/src/nodes.rs +++ b/daqbufp2/src/nodes.rs @@ -1,7 +1,10 @@ use crate::spawn_test_hosts; use err::Error; +use netpod::log::*; use netpod::Cluster; use std::sync::{Arc, Mutex}; +use std::{thread}; +use std::time::Duration; use tokio::task::JoinHandle; pub struct RunningHosts { @@ -47,7 +50,7 @@ pub fn require_test_hosts_running() -> Result, Error> { let mut g = HOSTS_RUNNING.lock().unwrap(); match g.as_ref() { None => { - netpod::log::info!("\n\n+++++++++++++++++++ MAKE NEW RunningHosts\n\n"); + info!("\n\n+++++++++++++++++++ MAKE NEW RunningHosts\n\n"); let cluster = netpod::test_cluster(); let jhs = spawn_test_hosts(cluster.clone()); let ret = RunningHosts { @@ -56,10 +59,12 @@ pub fn require_test_hosts_running() -> Result, Error> { }; let a = Arc::new(ret); *g = Some(a.clone()); + // TODO check in different way that test hosts are up, sockets connected, ready for testing + thread::sleep(Duration::from_millis(400)); Ok(a) } Some(gg) => { - netpod::log::debug!("\n\n+++++++++++++++++++ REUSE RunningHost\n\n"); + debug!("\n\n+++++++++++++++++++ REUSE RunningHost\n\n"); Ok(gg.clone()) } } diff --git a/daqbufp2/src/test.rs b/daqbufp2/src/test.rs index d212a70..0a07c9f 100644 --- a/daqbufp2/src/test.rs +++ b/daqbufp2/src/test.rs @@ -1,10 +1,16 @@ +#[cfg(test)] +mod api1; +#[cfg(test)] +mod api4; pub mod archapp; pub mod binnedbinary; pub mod binnedjson; -pub mod events; +#[cfg(test)] +mod events; #[cfg(test)] mod eventsjson; -pub mod timeweightedjson; +#[cfg(test)] +mod timeweightedjson; use bytes::BytesMut; use err::Error; diff --git a/daqbufp2/src/test/api1.rs b/daqbufp2/src/test/api1.rs new file mode 100644 index 0000000..c63dcc9 --- /dev/null +++ b/daqbufp2/src/test/api1.rs @@ -0,0 +1,35 @@ +use crate::err::ErrConv; +use crate::nodes::require_test_hosts_running; +use err::Error; +use futures_util::Future; +use http::{header, Request, StatusCode}; +use httpclient::{http_get, http_post}; +use hyper::Body; +use netpod::log::*; +use netpod::query::api1::{Api1Query, Api1Range}; +use url::Url; + +fn testrun(fut: F) -> Result +where + F: Future>, +{ + taskrun::run(fut) +} + +#[test] +fn events_f64_plain() -> Result<(), Error> { + let fut = async { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + let node = &cluster.nodes[0]; + let url: Url = format!("http://{}:{}/api/1/query", node.host, node.port).parse()?; + let accept = "application/json"; + //let qu = Api1Query::new(Api1Range::new(), vec!["testbackend/scalar-i32-be"]); + let buf = http_post(url, accept, "{}".into()).await?; + let js = String::from_utf8_lossy(&buf); + eprintln!("string received: {js}"); + Ok(()) + }; + testrun(fut)?; + Ok(()) +} diff --git a/daqbufp2/src/test/api4.rs b/daqbufp2/src/test/api4.rs new file mode 100644 index 0000000..e69de29 diff --git a/daqbufp2/src/test/binnedbinary.rs b/daqbufp2/src/test/binnedbinary.rs index c40dae3..5cfc504 100644 --- a/daqbufp2/src/test/binnedbinary.rs +++ b/daqbufp2/src/test/binnedbinary.rs @@ -5,6 +5,7 @@ use disk::streamlog::Streamlog; use err::Error; use futures_util::{StreamExt, TryStreamExt}; use http::StatusCode; +use httpclient::HttpBodyAsAsyncRead; use hyper::Body; use items::binsdim0::MinMaxAvgDim0Bins; use items::{RangeCompletableItem, Sitemty, StatsItem, StreamItem, SubFrId, WithLen}; @@ -125,7 +126,7 @@ where if res.status() != StatusCode::OK { error!("client response {:?}", res); } - let s1 = disk::cache::HttpBodyAsAsyncRead::new(res); + let s1 = HttpBodyAsAsyncRead::new(res); let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap); let res = consume_binned_response::(s2).await?; let t2 = chrono::Utc::now(); diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs index cfd5549..1f718a2 100644 --- a/daqbufp2/src/test/events.rs +++ b/daqbufp2/src/test/events.rs @@ -5,6 +5,7 @@ use disk::streamlog::Streamlog; use err::Error; use futures_util::{StreamExt, TryStreamExt}; use http::StatusCode; +use httpclient::HttpBodyAsAsyncRead; use hyper::Body; use items::numops::NumOps; use items::scalarevents::ScalarEvents; @@ -99,7 +100,7 @@ where error!("client response {res:?}"); return Err(format!("get_plain_events_binary client response {res:?}").into()); } - let s1 = disk::cache::HttpBodyAsAsyncRead::new(res); + let s1 = HttpBodyAsAsyncRead::new(res); let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap); let res = consume_plain_events_binary::(s2).await?; let t2 = chrono::Utc::now(); diff --git a/disk/src/binned/binnedfrompbv.rs b/disk/src/binned/binnedfrompbv.rs index 9368882..c39eea9 100644 --- a/disk/src/binned/binnedfrompbv.rs +++ b/disk/src/binned/binnedfrompbv.rs @@ -1,10 +1,11 @@ use crate::agg::binnedt::TBinnerStream; use crate::binned::query::PreBinnedQuery; -use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead}; +use crate::cache::node_ix_for_patch; use err::Error; use futures_core::Stream; use futures_util::{FutureExt, StreamExt}; use http::{StatusCode, Uri}; +use httpclient::HttpBodyAsAsyncRead; use items::frame::decode_frame; use items::{FrameDecodable, FrameType, FrameTypeInnerStatic, TimeBinnableType}; use items::{RangeCompletableItem, Sitemty, StreamItem}; diff --git a/disk/src/cache.rs b/disk/src/cache.rs index a9f1594..29304ef 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,8 +1,5 @@ -use bytes::Bytes; use chrono::Utc; use err::Error; -use futures_util::pin_mut; -use hyper::{Body, Response}; use netpod::log::*; use netpod::timeunits::SEC; use netpod::{AggKind, Channel, Cluster, NodeConfigCached, PreBinnedPatchCoord}; @@ -10,72 +7,8 @@ use serde::{Deserialize, Serialize}; use std::collections::VecDeque; use std::io; use std::path::PathBuf; -use std::pin::Pin; -use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use tiny_keccak::Hasher; -use tokio::io::{AsyncRead, ReadBuf}; - -// TODO move to a better fitting module: -pub struct HttpBodyAsAsyncRead { - inp: Response, - left: Bytes, - rp: usize, -} - -impl HttpBodyAsAsyncRead { - pub fn new(inp: Response) -> Self { - Self { - inp, - left: Bytes::new(), - rp: 0, - } - } -} - -impl AsyncRead for HttpBodyAsAsyncRead { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut ReadBuf) -> Poll> { - use hyper::body::HttpBody; - use Poll::*; - if self.left.len() != 0 { - let n1 = buf.remaining(); - let n2 = self.left.len() - self.rp; - if n2 <= n1 { - buf.put_slice(self.left[self.rp..].as_ref()); - self.left = Bytes::new(); - self.rp = 0; - Ready(Ok(())) - } else { - buf.put_slice(self.left[self.rp..(self.rp + n2)].as_ref()); - self.rp += n2; - Ready(Ok(())) - } - } else { - let f = &mut self.inp; - pin_mut!(f); - match f.poll_data(cx) { - Ready(Some(Ok(k))) => { - let n1 = buf.remaining(); - if k.len() <= n1 { - buf.put_slice(k.as_ref()); - Ready(Ok(())) - } else { - buf.put_slice(k[..n1].as_ref()); - self.left = k; - self.rp = n1; - Ready(Ok(())) - } - } - Ready(Some(Err(e))) => Ready(Err(io::Error::new( - io::ErrorKind::Other, - Error::with_msg(format!("Received by HttpBodyAsAsyncRead: {:?}", e)), - ))), - Ready(None) => Ready(Ok(())), - Pending => Pending, - } - } - } -} // For file-based caching, this determined the node where the cache file is located. // No longer needed for scylla-based caching. diff --git a/httpclient/Cargo.toml b/httpclient/Cargo.toml index 77e9be5..58e446b 100644 --- a/httpclient/Cargo.toml +++ b/httpclient/Cargo.toml @@ -1,22 +1,25 @@ [package] name = "httpclient" -version = "0.0.1-a.0" +version = "0.0.2" authors = ["Dominik Werder "] edition = "2021" +[lib] +path = "src/httpclient.rs" + [dependencies] -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -http = "0.2" -url = "2.2" -tokio = { version = "1.11.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } -hyper = { version = "0.14.3", features = ["http1", "http2", "client", "server", "tcp", "stream"] } +futures-util = "0.3.25" +serde = { version = "1.0.147", features = ["derive"] } +serde_json = "1.0.89" +rmp-serde = "1.1.1" +http = "0.2.8" +url = "2.3.1" +tokio = { version = "1.22.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tracing = "0.1.37" +hyper = { version = "0.14.23", features = ["http1", "http2", "client", "server", "tcp", "stream"] } hyper-tls = { version = "0.5.0" } -bytes = "1.0.1" -futures-core = "0.3.14" -futures-util = "0.3.14" -tracing = "0.1.25" -async-channel = "1.6" +bytes = "1.3.0" +async-channel = "1.7.1" err = { path = "../err" } netpod = { path = "../netpod" } parse = { path = "../parse" } diff --git a/httpclient/src/httpclient.rs b/httpclient/src/httpclient.rs new file mode 100644 index 0000000..bb12099 --- /dev/null +++ b/httpclient/src/httpclient.rs @@ -0,0 +1,186 @@ +use bytes::Bytes; +use err::{Error, PublicError}; +use futures_util::pin_mut; +use http::{header, Request, Response, StatusCode}; +use hyper::body::HttpBody; +use hyper::{Body, Method}; +use netpod::log::*; +use netpod::{AppendToUrl, ChannelConfigQuery, ChannelConfigResponse, NodeConfigCached}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{self, AsyncRead, ReadBuf}; +use url::Url; + +pub trait ErrConv { + fn ec(self) -> Result; +} + +pub trait Convable: ToString {} + +impl ErrConv for Result { + fn ec(self) -> Result { + match self { + Ok(x) => Ok(x), + Err(e) => Err(::err::Error::from_string(e.to_string())), + } + } +} + +impl Convable for http::Error {} +impl Convable for hyper::Error {} + +pub async fn http_get(url: Url, accept: &str) -> Result { + let req = Request::builder() + .method(http::Method::GET) + .uri(url.to_string()) + .header(header::ACCEPT, accept) + .body(Body::empty()) + .ec()?; + let client = hyper::Client::new(); + let res = client.request(req).await.ec()?; + if res.status() != StatusCode::OK { + error!("Server error {:?}", res); + let (head, body) = res.into_parts(); + let buf = hyper::body::to_bytes(body).await.ec()?; + let s = String::from_utf8_lossy(&buf); + return Err(Error::with_msg(format!( + concat!( + "Server error {:?}\n", + "---------------------- message from http body:\n", + "{}\n", + "---------------------- end of http body", + ), + head, s + ))); + } + let body = hyper::body::to_bytes(res.into_body()).await.ec()?; + Ok(body) +} + +pub async fn http_post(url: Url, accept: &str, body: String) -> Result { + let req = Request::builder() + .method(http::Method::POST) + .uri(url.to_string()) + .header(header::ACCEPT, accept) + .body(Body::from(body)) + .ec()?; + let client = hyper::Client::new(); + let res = client.request(req).await.ec()?; + if res.status() != StatusCode::OK { + error!("Server error {:?}", res); + let (head, body) = res.into_parts(); + let buf = hyper::body::to_bytes(body).await.ec()?; + let s = String::from_utf8_lossy(&buf); + return Err(Error::with_msg(format!( + concat!( + "Server error {:?}\n", + "---------------------- message from http body:\n", + "{}\n", + "---------------------- end of http body", + ), + head, s + ))); + } + let body = hyper::body::to_bytes(res.into_body()).await.ec()?; + Ok(body) +} + +// TODO move to a better fitting module: +pub struct HttpBodyAsAsyncRead { + inp: Response, + left: Bytes, + rp: usize, +} + +impl HttpBodyAsAsyncRead { + pub fn new(inp: Response) -> Self { + Self { + inp, + left: Bytes::new(), + rp: 0, + } + } +} + +impl AsyncRead for HttpBodyAsAsyncRead { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut ReadBuf) -> Poll> { + use Poll::*; + if self.left.len() != 0 { + let n1 = buf.remaining(); + let n2 = self.left.len() - self.rp; + if n2 <= n1 { + buf.put_slice(self.left[self.rp..].as_ref()); + self.left = Bytes::new(); + self.rp = 0; + Ready(Ok(())) + } else { + buf.put_slice(self.left[self.rp..(self.rp + n2)].as_ref()); + self.rp += n2; + Ready(Ok(())) + } + } else { + let f = &mut self.inp; + pin_mut!(f); + match f.poll_data(cx) { + Ready(Some(Ok(k))) => { + let n1 = buf.remaining(); + if k.len() <= n1 { + buf.put_slice(k.as_ref()); + Ready(Ok(())) + } else { + buf.put_slice(k[..n1].as_ref()); + self.left = k; + self.rp = n1; + Ready(Ok(())) + } + } + Ready(Some(Err(e))) => Ready(Err(io::Error::new( + io::ErrorKind::Other, + Error::with_msg(format!("Received by HttpBodyAsAsyncRead: {:?}", e)), + ))), + Ready(None) => Ready(Ok(())), + Pending => Pending, + } + } + } +} + +pub async fn get_channel_config( + q: &ChannelConfigQuery, + node_config: &NodeConfigCached, +) -> Result { + let mut url = Url::parse(&format!( + "http://{}:{}/api/4/channel/config", + node_config.node.host, node_config.node.port + ))?; + q.append_to_url(&mut url); + let req = hyper::Request::builder() + .method(Method::GET) + .uri(url.as_str()) + .body(Body::empty()) + .map_err(Error::from_string)?; + let client = hyper::Client::new(); + let res = client + .request(req) + .await + .map_err(|e| Error::with_msg(format!("get_channel_config request error: {e:?}")))?; + if res.status().is_success() { + let buf = hyper::body::to_bytes(res.into_body()) + .await + .map_err(|e| Error::with_msg(format!("can not read response: {e:?}")))?; + let ret: ChannelConfigResponse = serde_json::from_slice(&buf) + .map_err(|e| Error::with_msg(format!("can not parse the channel config response json: {e:?}")))?; + Ok(ret) + } else { + let buf = hyper::body::to_bytes(res.into_body()) + .await + .map_err(|e| Error::with_msg(format!("can not read response: {e:?}")))?; + match serde_json::from_slice::(&buf) { + Ok(e) => Err(e.into()), + Err(_) => Err(Error::with_msg(format!( + "can not parse the http error body: {:?}", + String::from_utf8_lossy(&buf) + ))), + } + } +} diff --git a/httpclient/src/lib.rs b/httpclient/src/lib.rs deleted file mode 100644 index 758ce53..0000000 --- a/httpclient/src/lib.rs +++ /dev/null @@ -1,44 +0,0 @@ -use err::{Error, PublicError}; -use hyper::{Body, Method}; -use netpod::{AppendToUrl, ChannelConfigQuery, ChannelConfigResponse, NodeConfigCached}; -use url::Url; - -pub async fn get_channel_config( - q: &ChannelConfigQuery, - node_config: &NodeConfigCached, -) -> Result { - let mut url = Url::parse(&format!( - "http://{}:{}/api/4/channel/config", - node_config.node.host, node_config.node.port - ))?; - q.append_to_url(&mut url); - let req = hyper::Request::builder() - .method(Method::GET) - .uri(url.as_str()) - .body(Body::empty()) - .map_err(Error::from_string)?; - let client = hyper::Client::new(); - let res = client - .request(req) - .await - .map_err(|e| Error::with_msg(format!("get_channel_config request error: {e:?}")))?; - if res.status().is_success() { - let buf = hyper::body::to_bytes(res.into_body()) - .await - .map_err(|e| Error::with_msg(format!("can not read response: {e:?}")))?; - let ret: ChannelConfigResponse = serde_json::from_slice(&buf) - .map_err(|e| Error::with_msg(format!("can not parse the channel config response json: {e:?}")))?; - Ok(ret) - } else { - let buf = hyper::body::to_bytes(res.into_body()) - .await - .map_err(|e| Error::with_msg(format!("can not read response: {e:?}")))?; - match serde_json::from_slice::(&buf) { - Ok(e) => Err(e.into()), - Err(_) => Err(Error::with_msg(format!( - "can not parse the http error body: {:?}", - String::from_utf8_lossy(&buf) - ))), - } - } -} diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 823174e..83413a7 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -9,11 +9,13 @@ use hyper::{Body, Client, Request, Response}; use items::eventfull::EventFull; use items::{RangeCompletableItem, Sitemty, StreamItem}; use itertools::Itertools; +use netpod::log::*; +use netpod::query::api1::Api1Query; use netpod::query::RawEventsQuery; use netpod::timeunits::SEC; -use netpod::{log::*, DiskIoTune, ReadSys, ACCEPT_ALL}; -use netpod::{ByteSize, Channel, FileIoBufferSize, NanoRange, NodeConfigCached, PerfOpts, Shape, APP_OCTET}; -use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON}; +use netpod::{ByteSize, Channel, DiskIoTune, NanoRange, NodeConfigCached, PerfOpts, Shape}; +use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig}; +use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET}; use parse::channelconfig::extract_matching_config_entry; use parse::channelconfig::read_local_config; use parse::channelconfig::{Config, ConfigEntry, MatchingConfigEntry}; @@ -466,56 +468,6 @@ async fn process_answer(res: Response) -> Result { } } -#[derive(Debug, Serialize, Deserialize)] -pub struct Api1Range { - #[serde(rename = "startDate")] - start_date: String, - #[serde(rename = "endDate")] - end_date: String, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct Api1Query { - channels: Vec, - range: Api1Range, - // All following parameters are private and not to be used - #[serde(default)] - file_io_buffer_size: Option, - #[serde(default)] - decompress: bool, - #[serde(default = "u64_max", skip_serializing_if = "is_u64_max")] - events_max: u64, - #[serde(default)] - io_queue_len: u64, - #[serde(default)] - log_level: String, - #[serde(default)] - read_sys: String, -} - -impl Api1Query { - pub fn disk_io_tune(&self) -> DiskIoTune { - let mut k = DiskIoTune::default(); - if let Some(x) = &self.file_io_buffer_size { - k.read_buffer_len = x.0; - } - if self.io_queue_len != 0 { - k.read_queue_len = self.io_queue_len as usize; - } - let read_sys: ReadSys = self.read_sys.as_str().into(); - k.read_sys = read_sys; - k - } -} - -fn u64_max() -> u64 { - u64::MAX -} - -fn is_u64_max(x: &u64) -> bool { - *x == u64::MAX -} - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Api1ChannelHeader { name: String, @@ -878,9 +830,9 @@ impl Api1EventsBinaryHandler { return Err(Error::with_msg_no_trace("can not parse query")); } }; - let span = if qu.log_level == "trace" { + let span = if qu.log_level() == "trace" { tracing::span!(tracing::Level::TRACE, "log_span_t") - } else if qu.log_level == "debug" { + } else if qu.log_level() == "debug" { tracing::span!(tracing::Level::DEBUG, "log_span_d") } else { tracing::Span::none() @@ -900,14 +852,12 @@ impl Api1EventsBinaryHandler { // TODO this should go to usage statistics: info!( "Handle Api1Query {:?} {} {:?}", - qu.range, - qu.channels.len(), - qu.channels.first() + qu.range(), + qu.channels().len(), + qu.channels().first() ); - let beg_date = chrono::DateTime::parse_from_rfc3339(&qu.range.start_date); - let end_date = chrono::DateTime::parse_from_rfc3339(&qu.range.end_date); - let beg_date = beg_date?; - let end_date = end_date?; + let beg_date = qu.range().beg().clone(); + let end_date = qu.range().end().clone(); trace!("Api1Query beg_date {:?} end_date {:?}", beg_date, end_date); //let url = Url::parse(&format!("dummy:{}", req.uri()))?; //let query = PlainEventsBinaryQuery::from_url(&url)?; @@ -923,7 +873,7 @@ impl Api1EventsBinaryHandler { // TODO check for valid given backend name: let backend = &node_config.node_config.cluster.backend; let chans = qu - .channels + .channels() .iter() .map(|x| Channel { backend: backend.into(), @@ -937,8 +887,8 @@ impl Api1EventsBinaryHandler { range.clone(), chans, qu.disk_io_tune().clone(), - qu.decompress, - qu.events_max, + qu.decompress(), + qu.events_max(), status_id.clone(), node_config.clone(), ); diff --git a/netpod/src/api1.rs b/netpod/src/api1.rs deleted file mode 100644 index d1787b3..0000000 --- a/netpod/src/api1.rs +++ /dev/null @@ -1,22 +0,0 @@ -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Serialize, Deserialize)] -pub struct Range { - #[serde(rename = "type")] - ty: String, - #[serde(rename = "startDate")] - beg: String, - #[serde(rename = "endDate")] - end: String, -} - -// TODO implement Deserialize such that I recognize the different possible formats... -// I guess, when serializing, it's ok to use the fully qualified format throughout. -#[derive(Debug, Serialize, Deserialize)] -pub struct ChannelList {} - -#[derive(Debug, Serialize, Deserialize)] -pub struct Query { - range: Range, - channels: ChannelList, -} diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 33a87c5..f3e6241 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1,4 +1,3 @@ -pub mod api1; pub mod histo; pub mod query; pub mod status; diff --git a/netpod/src/query.rs b/netpod/src/query.rs index 12edc1d..03c8d2b 100644 --- a/netpod/src/query.rs +++ b/netpod/src/query.rs @@ -1,7 +1,8 @@ -use crate::{ - get_url_query_pairs, AggKind, AppendToUrl, ByteSize, Channel, FromUrl, HasBackend, HasTimeout, NanoRange, ToNanos, -}; +pub mod api1; + +use crate::get_url_query_pairs; use crate::{log::*, DiskIoTune}; +use crate::{AggKind, AppendToUrl, ByteSize, Channel, FromUrl, HasBackend, HasTimeout, NanoRange, ToNanos}; use chrono::{DateTime, TimeZone, Utc}; use err::Error; use serde::{Deserialize, Serialize}; diff --git a/netpod/src/query/api1.rs b/netpod/src/query/api1.rs new file mode 100644 index 0000000..2e36de2 --- /dev/null +++ b/netpod/src/query/api1.rs @@ -0,0 +1,196 @@ +use crate::{DiskIoTune, FileIoBufferSize, ReadSys}; +use chrono::{DateTime, FixedOffset, NaiveDate, TimeZone}; +use serde::{Deserialize, Serialize}; +use std::fmt; + +fn u64_max() -> u64 { + u64::MAX +} + +fn is_u64_max(x: &u64) -> bool { + *x == u64::MAX +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Api1Range { + #[serde(rename = "type", default, skip_serializing_if = "String::is_empty")] + ty: String, + #[serde( + rename = "startDate", + serialize_with = "datetime_serde::ser", + deserialize_with = "datetime_serde::de" + )] + beg: DateTime, + #[serde( + rename = "endDate", + serialize_with = "datetime_serde::ser", + deserialize_with = "datetime_serde::de" + )] + end: DateTime, +} + +mod datetime_serde { + // RFC 3339 / ISO 8601 + + use super::*; + use serde::de::Visitor; + use serde::{Deserializer, Serializer}; + + pub fn ser(val: &DateTime, ser: S) -> Result { + let s = val.format("%Y-%m-%dT%H:%M:%S.%6f%:z").to_string(); + ser.serialize_str(&s) + } + + struct DateTimeVisitor {} + + impl<'de> Visitor<'de> for DateTimeVisitor { + type Value = DateTime; + + fn expecting(&self, fmt: &mut fmt::Formatter) -> std::fmt::Result { + write!(fmt, "DateTimeWithOffset") + } + + fn visit_str(self, val: &str) -> Result + where + E: serde::de::Error, + { + let res = DateTime::::parse_from_rfc3339(val); + match res { + Ok(res) => Ok(res), + // TODO deliver better fine grained error + Err(e) => Err(serde::de::Error::custom(format!("{e}"))), + } + } + } + + pub fn de<'de, D>(de: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + de.deserialize_str(DateTimeVisitor {}) + } +} + +impl Api1Range { + pub fn new(beg: A, end: B) -> Self + where + A: Into>, + B: Into>, + { + Self { + ty: String::new(), + beg: beg.into(), + end: end.into(), + } + } + + pub fn beg(&self) -> &DateTime { + &self.beg + } + + pub fn end(&self) -> &DateTime { + &self.end + } +} + +#[test] +fn serde_de_range_zulu() { + let s = r#"{"startDate": "2022-11-22T10:15:12.412Z", "endDate": "2022-11-22T10:15:12.413556Z"}"#; + let range: Api1Range = serde_json::from_str(s).unwrap(); + assert_eq!(range.beg().offset().local_minus_utc(), 0); + assert_eq!(range.end().offset().local_minus_utc(), 0); + assert_eq!(range.beg().timestamp_subsec_micros(), 412000); + assert_eq!(range.end().timestamp_subsec_micros(), 413556); +} + +#[test] +fn serde_de_range_offset() { + let s = r#"{"startDate": "2022-11-22T10:15:12.412Z", "endDate": "2022-11-22T10:15:12.413556Z"}"#; + let range: Api1Range = serde_json::from_str(s).unwrap(); + assert_eq!(range.beg().offset().local_minus_utc(), 0); + assert_eq!(range.end().offset().local_minus_utc(), 0); + assert_eq!(range.beg().timestamp_subsec_micros(), 412000); + assert_eq!(range.end().timestamp_subsec_micros(), 413556); +} + +#[test] +fn serde_ser_range_offset() { + let beg = FixedOffset::east_opt(60 * 60 * 3) + .unwrap() + .from_local_datetime( + &NaiveDate::from_ymd_opt(2022, 11, 22) + .unwrap() + .and_hms_milli_opt(13, 14, 15, 16) + .unwrap(), + ) + .earliest() + .unwrap(); + let end = FixedOffset::east_opt(-60 * 60 * 1) + .unwrap() + .from_local_datetime( + &NaiveDate::from_ymd_opt(2022, 11, 22) + .unwrap() + .and_hms_milli_opt(13, 14, 15, 800) + .unwrap(), + ) + .earliest() + .unwrap(); + let range = Api1Range::new(beg, end); + let js = serde_json::to_string(&range).unwrap(); + let exp = r#"{"startDate":"2022-11-22T13:14:15.016000+03:00","endDate":"2022-11-22T13:14:15.800000-01:00"}"#; + assert_eq!(js, exp); +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Api1Query { + channels: Vec, + range: Api1Range, + // All following parameters are private and not to be used + #[serde(default)] + file_io_buffer_size: Option, + #[serde(default)] + decompress: bool, + #[serde(default = "u64_max", skip_serializing_if = "is_u64_max")] + events_max: u64, + #[serde(default)] + io_queue_len: u64, + #[serde(default)] + log_level: String, + #[serde(default)] + read_sys: String, +} + +impl Api1Query { + pub fn disk_io_tune(&self) -> DiskIoTune { + let mut k = DiskIoTune::default(); + if let Some(x) = &self.file_io_buffer_size { + k.read_buffer_len = x.0; + } + if self.io_queue_len != 0 { + k.read_queue_len = self.io_queue_len as usize; + } + let read_sys: ReadSys = self.read_sys.as_str().into(); + k.read_sys = read_sys; + k + } + + pub fn range(&self) -> &Api1Range { + &self.range + } + + pub fn channels(&self) -> &[String] { + &self.channels + } + + pub fn log_level(&self) -> &str { + &self.log_level + } + + pub fn decompress(&self) -> bool { + self.decompress + } + + pub fn events_max(&self) -> u64 { + self.events_max + } +} diff --git a/taskrun/src/taskrun.rs b/taskrun/src/taskrun.rs index 21f2fdc..e6529cc 100644 --- a/taskrun/src/taskrun.rs +++ b/taskrun/src/taskrun.rs @@ -70,9 +70,9 @@ pub fn get_runtime_opts(nworkers: usize, nblocking: usize) -> Arc { } } -pub fn run(f: F) -> Result +pub fn run(fut: F) -> Result where - F: std::future::Future>, + F: Future>, { let runtime = get_runtime(); match tracing_init() { @@ -81,7 +81,7 @@ where eprintln!("TRACING: {e:?}"); } } - let res = runtime.block_on(async { f.await }); + let res = runtime.block_on(async { fut.await }); match res { Ok(k) => Ok(k), Err(e) => {