Add test case for Api1Range serde
This commit is contained in:
+4
-4
@@ -5,13 +5,12 @@ authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
|||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
|
tokio = { version = "1.22.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
|
||||||
hyper = "0.14"
|
hyper = "0.14"
|
||||||
http = "0.2"
|
http = "0.2"
|
||||||
tracing = "0.1.25"
|
tracing = "0.1.25"
|
||||||
tracing-subscriber = "0.2.17"
|
tracing-subscriber = "0.3.16"
|
||||||
futures-core = "0.3.14"
|
futures-util = "0.3.25"
|
||||||
futures-util = "0.3.14"
|
|
||||||
bytes = "1.0.1"
|
bytes = "1.0.1"
|
||||||
serde = "1.0"
|
serde = "1.0"
|
||||||
serde_derive = "1.0"
|
serde_derive = "1.0"
|
||||||
@@ -24,6 +23,7 @@ err = { path = "../err" }
|
|||||||
taskrun = { path = "../taskrun" }
|
taskrun = { path = "../taskrun" }
|
||||||
netpod = { path = "../netpod" }
|
netpod = { path = "../netpod" }
|
||||||
httpret = { path = "../httpret" }
|
httpret = { path = "../httpret" }
|
||||||
|
httpclient = { path = "../httpclient" }
|
||||||
disk = { path = "../disk" }
|
disk = { path = "../disk" }
|
||||||
items = { path = "../items" }
|
items = { path = "../items" }
|
||||||
streams = { path = "../streams" }
|
streams = { path = "../streams" }
|
||||||
|
|||||||
@@ -4,11 +4,13 @@ use disk::streamlog::Streamlog;
|
|||||||
use err::Error;
|
use err::Error;
|
||||||
use futures_util::TryStreamExt;
|
use futures_util::TryStreamExt;
|
||||||
use http::StatusCode;
|
use http::StatusCode;
|
||||||
|
use httpclient::HttpBodyAsAsyncRead;
|
||||||
use hyper::Body;
|
use hyper::Body;
|
||||||
use items::xbinnedwaveevents::XBinnedWaveEvents;
|
use items::xbinnedwaveevents::XBinnedWaveEvents;
|
||||||
use items::{Sitemty, StreamItem};
|
use items::{Sitemty, StreamItem};
|
||||||
|
use netpod::log::*;
|
||||||
use netpod::query::{BinnedQuery, CacheUsage};
|
use netpod::query::{BinnedQuery, CacheUsage};
|
||||||
use netpod::{log::*, AppendToUrl};
|
use netpod::AppendToUrl;
|
||||||
use netpod::{AggKind, ByteSize, Channel, HostPort, NanoRange, PerfOpts, APP_OCTET};
|
use netpod::{AggKind, ByteSize, Channel, HostPort, NanoRange, PerfOpts, APP_OCTET};
|
||||||
use streams::frames::inmem::InMemoryFrameAsyncReadStream;
|
use streams::frames::inmem::InMemoryFrameAsyncReadStream;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
@@ -91,7 +93,7 @@ pub async fn get_binned(
|
|||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
let perf_opts = PerfOpts { inmem_bufcap: 512 };
|
let perf_opts = PerfOpts { inmem_bufcap: 512 };
|
||||||
let s1 = disk::cache::HttpBodyAsAsyncRead::new(res);
|
let s1 = HttpBodyAsAsyncRead::new(res);
|
||||||
let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap);
|
let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap);
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
use std::future::ready;
|
use std::future::ready;
|
||||||
|
|||||||
+1
-1
@@ -5,7 +5,7 @@ pub trait ErrConv<T> {
|
|||||||
pub trait Convable: ToString {}
|
pub trait Convable: ToString {}
|
||||||
|
|
||||||
impl<T, E: Convable> ErrConv<T> for Result<T, E> {
|
impl<T, E: Convable> ErrConv<T> for Result<T, E> {
|
||||||
fn ec(self) -> Result<T, err::Error> {
|
fn ec(self) -> Result<T, ::err::Error> {
|
||||||
match self {
|
match self {
|
||||||
Ok(x) => Ok(x),
|
Ok(x) => Ok(x),
|
||||||
Err(e) => Err(::err::Error::from_string(e.to_string())),
|
Err(e) => Err(::err::Error::from_string(e.to_string())),
|
||||||
|
|||||||
@@ -1,7 +1,10 @@
|
|||||||
use crate::spawn_test_hosts;
|
use crate::spawn_test_hosts;
|
||||||
use err::Error;
|
use err::Error;
|
||||||
|
use netpod::log::*;
|
||||||
use netpod::Cluster;
|
use netpod::Cluster;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::{thread};
|
||||||
|
use std::time::Duration;
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
|
|
||||||
pub struct RunningHosts {
|
pub struct RunningHosts {
|
||||||
@@ -47,7 +50,7 @@ pub fn require_test_hosts_running() -> Result<Arc<RunningHosts>, Error> {
|
|||||||
let mut g = HOSTS_RUNNING.lock().unwrap();
|
let mut g = HOSTS_RUNNING.lock().unwrap();
|
||||||
match g.as_ref() {
|
match g.as_ref() {
|
||||||
None => {
|
None => {
|
||||||
netpod::log::info!("\n\n+++++++++++++++++++ MAKE NEW RunningHosts\n\n");
|
info!("\n\n+++++++++++++++++++ MAKE NEW RunningHosts\n\n");
|
||||||
let cluster = netpod::test_cluster();
|
let cluster = netpod::test_cluster();
|
||||||
let jhs = spawn_test_hosts(cluster.clone());
|
let jhs = spawn_test_hosts(cluster.clone());
|
||||||
let ret = RunningHosts {
|
let ret = RunningHosts {
|
||||||
@@ -56,10 +59,12 @@ pub fn require_test_hosts_running() -> Result<Arc<RunningHosts>, Error> {
|
|||||||
};
|
};
|
||||||
let a = Arc::new(ret);
|
let a = Arc::new(ret);
|
||||||
*g = Some(a.clone());
|
*g = Some(a.clone());
|
||||||
|
// TODO check in different way that test hosts are up, sockets connected, ready for testing
|
||||||
|
thread::sleep(Duration::from_millis(400));
|
||||||
Ok(a)
|
Ok(a)
|
||||||
}
|
}
|
||||||
Some(gg) => {
|
Some(gg) => {
|
||||||
netpod::log::debug!("\n\n+++++++++++++++++++ REUSE RunningHost\n\n");
|
debug!("\n\n+++++++++++++++++++ REUSE RunningHost\n\n");
|
||||||
Ok(gg.clone())
|
Ok(gg.clone())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,10 +1,16 @@
|
|||||||
|
#[cfg(test)]
|
||||||
|
mod api1;
|
||||||
|
#[cfg(test)]
|
||||||
|
mod api4;
|
||||||
pub mod archapp;
|
pub mod archapp;
|
||||||
pub mod binnedbinary;
|
pub mod binnedbinary;
|
||||||
pub mod binnedjson;
|
pub mod binnedjson;
|
||||||
pub mod events;
|
#[cfg(test)]
|
||||||
|
mod events;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod eventsjson;
|
mod eventsjson;
|
||||||
pub mod timeweightedjson;
|
#[cfg(test)]
|
||||||
|
mod timeweightedjson;
|
||||||
|
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
use err::Error;
|
use err::Error;
|
||||||
|
|||||||
@@ -0,0 +1,35 @@
|
|||||||
|
use crate::err::ErrConv;
|
||||||
|
use crate::nodes::require_test_hosts_running;
|
||||||
|
use err::Error;
|
||||||
|
use futures_util::Future;
|
||||||
|
use http::{header, Request, StatusCode};
|
||||||
|
use httpclient::{http_get, http_post};
|
||||||
|
use hyper::Body;
|
||||||
|
use netpod::log::*;
|
||||||
|
use netpod::query::api1::{Api1Query, Api1Range};
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
fn testrun<T, F>(fut: F) -> Result<T, Error>
|
||||||
|
where
|
||||||
|
F: Future<Output = Result<T, Error>>,
|
||||||
|
{
|
||||||
|
taskrun::run(fut)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn events_f64_plain() -> Result<(), Error> {
|
||||||
|
let fut = async {
|
||||||
|
let rh = require_test_hosts_running()?;
|
||||||
|
let cluster = &rh.cluster;
|
||||||
|
let node = &cluster.nodes[0];
|
||||||
|
let url: Url = format!("http://{}:{}/api/1/query", node.host, node.port).parse()?;
|
||||||
|
let accept = "application/json";
|
||||||
|
//let qu = Api1Query::new(Api1Range::new(), vec!["testbackend/scalar-i32-be"]);
|
||||||
|
let buf = http_post(url, accept, "{}".into()).await?;
|
||||||
|
let js = String::from_utf8_lossy(&buf);
|
||||||
|
eprintln!("string received: {js}");
|
||||||
|
Ok(())
|
||||||
|
};
|
||||||
|
testrun(fut)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
@@ -5,6 +5,7 @@ use disk::streamlog::Streamlog;
|
|||||||
use err::Error;
|
use err::Error;
|
||||||
use futures_util::{StreamExt, TryStreamExt};
|
use futures_util::{StreamExt, TryStreamExt};
|
||||||
use http::StatusCode;
|
use http::StatusCode;
|
||||||
|
use httpclient::HttpBodyAsAsyncRead;
|
||||||
use hyper::Body;
|
use hyper::Body;
|
||||||
use items::binsdim0::MinMaxAvgDim0Bins;
|
use items::binsdim0::MinMaxAvgDim0Bins;
|
||||||
use items::{RangeCompletableItem, Sitemty, StatsItem, StreamItem, SubFrId, WithLen};
|
use items::{RangeCompletableItem, Sitemty, StatsItem, StreamItem, SubFrId, WithLen};
|
||||||
@@ -125,7 +126,7 @@ where
|
|||||||
if res.status() != StatusCode::OK {
|
if res.status() != StatusCode::OK {
|
||||||
error!("client response {:?}", res);
|
error!("client response {:?}", res);
|
||||||
}
|
}
|
||||||
let s1 = disk::cache::HttpBodyAsAsyncRead::new(res);
|
let s1 = HttpBodyAsAsyncRead::new(res);
|
||||||
let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap);
|
let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap);
|
||||||
let res = consume_binned_response::<NTY, _>(s2).await?;
|
let res = consume_binned_response::<NTY, _>(s2).await?;
|
||||||
let t2 = chrono::Utc::now();
|
let t2 = chrono::Utc::now();
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ use disk::streamlog::Streamlog;
|
|||||||
use err::Error;
|
use err::Error;
|
||||||
use futures_util::{StreamExt, TryStreamExt};
|
use futures_util::{StreamExt, TryStreamExt};
|
||||||
use http::StatusCode;
|
use http::StatusCode;
|
||||||
|
use httpclient::HttpBodyAsAsyncRead;
|
||||||
use hyper::Body;
|
use hyper::Body;
|
||||||
use items::numops::NumOps;
|
use items::numops::NumOps;
|
||||||
use items::scalarevents::ScalarEvents;
|
use items::scalarevents::ScalarEvents;
|
||||||
@@ -99,7 +100,7 @@ where
|
|||||||
error!("client response {res:?}");
|
error!("client response {res:?}");
|
||||||
return Err(format!("get_plain_events_binary client response {res:?}").into());
|
return Err(format!("get_plain_events_binary client response {res:?}").into());
|
||||||
}
|
}
|
||||||
let s1 = disk::cache::HttpBodyAsAsyncRead::new(res);
|
let s1 = HttpBodyAsAsyncRead::new(res);
|
||||||
let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap);
|
let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap);
|
||||||
let res = consume_plain_events_binary::<NTY, _>(s2).await?;
|
let res = consume_plain_events_binary::<NTY, _>(s2).await?;
|
||||||
let t2 = chrono::Utc::now();
|
let t2 = chrono::Utc::now();
|
||||||
|
|||||||
@@ -1,10 +1,11 @@
|
|||||||
use crate::agg::binnedt::TBinnerStream;
|
use crate::agg::binnedt::TBinnerStream;
|
||||||
use crate::binned::query::PreBinnedQuery;
|
use crate::binned::query::PreBinnedQuery;
|
||||||
use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead};
|
use crate::cache::node_ix_for_patch;
|
||||||
use err::Error;
|
use err::Error;
|
||||||
use futures_core::Stream;
|
use futures_core::Stream;
|
||||||
use futures_util::{FutureExt, StreamExt};
|
use futures_util::{FutureExt, StreamExt};
|
||||||
use http::{StatusCode, Uri};
|
use http::{StatusCode, Uri};
|
||||||
|
use httpclient::HttpBodyAsAsyncRead;
|
||||||
use items::frame::decode_frame;
|
use items::frame::decode_frame;
|
||||||
use items::{FrameDecodable, FrameType, FrameTypeInnerStatic, TimeBinnableType};
|
use items::{FrameDecodable, FrameType, FrameTypeInnerStatic, TimeBinnableType};
|
||||||
use items::{RangeCompletableItem, Sitemty, StreamItem};
|
use items::{RangeCompletableItem, Sitemty, StreamItem};
|
||||||
|
|||||||
@@ -1,8 +1,5 @@
|
|||||||
use bytes::Bytes;
|
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use err::Error;
|
use err::Error;
|
||||||
use futures_util::pin_mut;
|
|
||||||
use hyper::{Body, Response};
|
|
||||||
use netpod::log::*;
|
use netpod::log::*;
|
||||||
use netpod::timeunits::SEC;
|
use netpod::timeunits::SEC;
|
||||||
use netpod::{AggKind, Channel, Cluster, NodeConfigCached, PreBinnedPatchCoord};
|
use netpod::{AggKind, Channel, Cluster, NodeConfigCached, PreBinnedPatchCoord};
|
||||||
@@ -10,72 +7,8 @@ use serde::{Deserialize, Serialize};
|
|||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::pin::Pin;
|
|
||||||
use std::task::{Context, Poll};
|
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use tiny_keccak::Hasher;
|
use tiny_keccak::Hasher;
|
||||||
use tokio::io::{AsyncRead, ReadBuf};
|
|
||||||
|
|
||||||
// TODO move to a better fitting module:
|
|
||||||
pub struct HttpBodyAsAsyncRead {
|
|
||||||
inp: Response<Body>,
|
|
||||||
left: Bytes,
|
|
||||||
rp: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl HttpBodyAsAsyncRead {
|
|
||||||
pub fn new(inp: Response<Body>) -> Self {
|
|
||||||
Self {
|
|
||||||
inp,
|
|
||||||
left: Bytes::new(),
|
|
||||||
rp: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AsyncRead for HttpBodyAsAsyncRead {
|
|
||||||
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut ReadBuf) -> Poll<io::Result<()>> {
|
|
||||||
use hyper::body::HttpBody;
|
|
||||||
use Poll::*;
|
|
||||||
if self.left.len() != 0 {
|
|
||||||
let n1 = buf.remaining();
|
|
||||||
let n2 = self.left.len() - self.rp;
|
|
||||||
if n2 <= n1 {
|
|
||||||
buf.put_slice(self.left[self.rp..].as_ref());
|
|
||||||
self.left = Bytes::new();
|
|
||||||
self.rp = 0;
|
|
||||||
Ready(Ok(()))
|
|
||||||
} else {
|
|
||||||
buf.put_slice(self.left[self.rp..(self.rp + n2)].as_ref());
|
|
||||||
self.rp += n2;
|
|
||||||
Ready(Ok(()))
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
let f = &mut self.inp;
|
|
||||||
pin_mut!(f);
|
|
||||||
match f.poll_data(cx) {
|
|
||||||
Ready(Some(Ok(k))) => {
|
|
||||||
let n1 = buf.remaining();
|
|
||||||
if k.len() <= n1 {
|
|
||||||
buf.put_slice(k.as_ref());
|
|
||||||
Ready(Ok(()))
|
|
||||||
} else {
|
|
||||||
buf.put_slice(k[..n1].as_ref());
|
|
||||||
self.left = k;
|
|
||||||
self.rp = n1;
|
|
||||||
Ready(Ok(()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ready(Some(Err(e))) => Ready(Err(io::Error::new(
|
|
||||||
io::ErrorKind::Other,
|
|
||||||
Error::with_msg(format!("Received by HttpBodyAsAsyncRead: {:?}", e)),
|
|
||||||
))),
|
|
||||||
Ready(None) => Ready(Ok(())),
|
|
||||||
Pending => Pending,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// For file-based caching, this determined the node where the cache file is located.
|
// For file-based caching, this determined the node where the cache file is located.
|
||||||
// No longer needed for scylla-based caching.
|
// No longer needed for scylla-based caching.
|
||||||
|
|||||||
+15
-12
@@ -1,22 +1,25 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "httpclient"
|
name = "httpclient"
|
||||||
version = "0.0.1-a.0"
|
version = "0.0.2"
|
||||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
|
[lib]
|
||||||
|
path = "src/httpclient.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
futures-util = "0.3.25"
|
||||||
serde_json = "1.0"
|
serde = { version = "1.0.147", features = ["derive"] }
|
||||||
http = "0.2"
|
serde_json = "1.0.89"
|
||||||
url = "2.2"
|
rmp-serde = "1.1.1"
|
||||||
tokio = { version = "1.11.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
|
http = "0.2.8"
|
||||||
hyper = { version = "0.14.3", features = ["http1", "http2", "client", "server", "tcp", "stream"] }
|
url = "2.3.1"
|
||||||
|
tokio = { version = "1.22.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
|
||||||
|
tracing = "0.1.37"
|
||||||
|
hyper = { version = "0.14.23", features = ["http1", "http2", "client", "server", "tcp", "stream"] }
|
||||||
hyper-tls = { version = "0.5.0" }
|
hyper-tls = { version = "0.5.0" }
|
||||||
bytes = "1.0.1"
|
bytes = "1.3.0"
|
||||||
futures-core = "0.3.14"
|
async-channel = "1.7.1"
|
||||||
futures-util = "0.3.14"
|
|
||||||
tracing = "0.1.25"
|
|
||||||
async-channel = "1.6"
|
|
||||||
err = { path = "../err" }
|
err = { path = "../err" }
|
||||||
netpod = { path = "../netpod" }
|
netpod = { path = "../netpod" }
|
||||||
parse = { path = "../parse" }
|
parse = { path = "../parse" }
|
||||||
|
|||||||
@@ -0,0 +1,186 @@
|
|||||||
|
use bytes::Bytes;
|
||||||
|
use err::{Error, PublicError};
|
||||||
|
use futures_util::pin_mut;
|
||||||
|
use http::{header, Request, Response, StatusCode};
|
||||||
|
use hyper::body::HttpBody;
|
||||||
|
use hyper::{Body, Method};
|
||||||
|
use netpod::log::*;
|
||||||
|
use netpod::{AppendToUrl, ChannelConfigQuery, ChannelConfigResponse, NodeConfigCached};
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
use tokio::io::{self, AsyncRead, ReadBuf};
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
pub trait ErrConv<T> {
|
||||||
|
fn ec(self) -> Result<T, ::err::Error>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait Convable: ToString {}
|
||||||
|
|
||||||
|
impl<T, E: Convable> ErrConv<T> for Result<T, E> {
|
||||||
|
fn ec(self) -> Result<T, ::err::Error> {
|
||||||
|
match self {
|
||||||
|
Ok(x) => Ok(x),
|
||||||
|
Err(e) => Err(::err::Error::from_string(e.to_string())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Convable for http::Error {}
|
||||||
|
impl Convable for hyper::Error {}
|
||||||
|
|
||||||
|
pub async fn http_get(url: Url, accept: &str) -> Result<Bytes, Error> {
|
||||||
|
let req = Request::builder()
|
||||||
|
.method(http::Method::GET)
|
||||||
|
.uri(url.to_string())
|
||||||
|
.header(header::ACCEPT, accept)
|
||||||
|
.body(Body::empty())
|
||||||
|
.ec()?;
|
||||||
|
let client = hyper::Client::new();
|
||||||
|
let res = client.request(req).await.ec()?;
|
||||||
|
if res.status() != StatusCode::OK {
|
||||||
|
error!("Server error {:?}", res);
|
||||||
|
let (head, body) = res.into_parts();
|
||||||
|
let buf = hyper::body::to_bytes(body).await.ec()?;
|
||||||
|
let s = String::from_utf8_lossy(&buf);
|
||||||
|
return Err(Error::with_msg(format!(
|
||||||
|
concat!(
|
||||||
|
"Server error {:?}\n",
|
||||||
|
"---------------------- message from http body:\n",
|
||||||
|
"{}\n",
|
||||||
|
"---------------------- end of http body",
|
||||||
|
),
|
||||||
|
head, s
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
let body = hyper::body::to_bytes(res.into_body()).await.ec()?;
|
||||||
|
Ok(body)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn http_post(url: Url, accept: &str, body: String) -> Result<Bytes, Error> {
|
||||||
|
let req = Request::builder()
|
||||||
|
.method(http::Method::POST)
|
||||||
|
.uri(url.to_string())
|
||||||
|
.header(header::ACCEPT, accept)
|
||||||
|
.body(Body::from(body))
|
||||||
|
.ec()?;
|
||||||
|
let client = hyper::Client::new();
|
||||||
|
let res = client.request(req).await.ec()?;
|
||||||
|
if res.status() != StatusCode::OK {
|
||||||
|
error!("Server error {:?}", res);
|
||||||
|
let (head, body) = res.into_parts();
|
||||||
|
let buf = hyper::body::to_bytes(body).await.ec()?;
|
||||||
|
let s = String::from_utf8_lossy(&buf);
|
||||||
|
return Err(Error::with_msg(format!(
|
||||||
|
concat!(
|
||||||
|
"Server error {:?}\n",
|
||||||
|
"---------------------- message from http body:\n",
|
||||||
|
"{}\n",
|
||||||
|
"---------------------- end of http body",
|
||||||
|
),
|
||||||
|
head, s
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
let body = hyper::body::to_bytes(res.into_body()).await.ec()?;
|
||||||
|
Ok(body)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO move to a better fitting module:
|
||||||
|
pub struct HttpBodyAsAsyncRead {
|
||||||
|
inp: Response<Body>,
|
||||||
|
left: Bytes,
|
||||||
|
rp: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HttpBodyAsAsyncRead {
|
||||||
|
pub fn new(inp: Response<Body>) -> Self {
|
||||||
|
Self {
|
||||||
|
inp,
|
||||||
|
left: Bytes::new(),
|
||||||
|
rp: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncRead for HttpBodyAsAsyncRead {
|
||||||
|
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut ReadBuf) -> Poll<io::Result<()>> {
|
||||||
|
use Poll::*;
|
||||||
|
if self.left.len() != 0 {
|
||||||
|
let n1 = buf.remaining();
|
||||||
|
let n2 = self.left.len() - self.rp;
|
||||||
|
if n2 <= n1 {
|
||||||
|
buf.put_slice(self.left[self.rp..].as_ref());
|
||||||
|
self.left = Bytes::new();
|
||||||
|
self.rp = 0;
|
||||||
|
Ready(Ok(()))
|
||||||
|
} else {
|
||||||
|
buf.put_slice(self.left[self.rp..(self.rp + n2)].as_ref());
|
||||||
|
self.rp += n2;
|
||||||
|
Ready(Ok(()))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let f = &mut self.inp;
|
||||||
|
pin_mut!(f);
|
||||||
|
match f.poll_data(cx) {
|
||||||
|
Ready(Some(Ok(k))) => {
|
||||||
|
let n1 = buf.remaining();
|
||||||
|
if k.len() <= n1 {
|
||||||
|
buf.put_slice(k.as_ref());
|
||||||
|
Ready(Ok(()))
|
||||||
|
} else {
|
||||||
|
buf.put_slice(k[..n1].as_ref());
|
||||||
|
self.left = k;
|
||||||
|
self.rp = n1;
|
||||||
|
Ready(Ok(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ready(Some(Err(e))) => Ready(Err(io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
Error::with_msg(format!("Received by HttpBodyAsAsyncRead: {:?}", e)),
|
||||||
|
))),
|
||||||
|
Ready(None) => Ready(Ok(())),
|
||||||
|
Pending => Pending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_channel_config(
|
||||||
|
q: &ChannelConfigQuery,
|
||||||
|
node_config: &NodeConfigCached,
|
||||||
|
) -> Result<ChannelConfigResponse, Error> {
|
||||||
|
let mut url = Url::parse(&format!(
|
||||||
|
"http://{}:{}/api/4/channel/config",
|
||||||
|
node_config.node.host, node_config.node.port
|
||||||
|
))?;
|
||||||
|
q.append_to_url(&mut url);
|
||||||
|
let req = hyper::Request::builder()
|
||||||
|
.method(Method::GET)
|
||||||
|
.uri(url.as_str())
|
||||||
|
.body(Body::empty())
|
||||||
|
.map_err(Error::from_string)?;
|
||||||
|
let client = hyper::Client::new();
|
||||||
|
let res = client
|
||||||
|
.request(req)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Error::with_msg(format!("get_channel_config request error: {e:?}")))?;
|
||||||
|
if res.status().is_success() {
|
||||||
|
let buf = hyper::body::to_bytes(res.into_body())
|
||||||
|
.await
|
||||||
|
.map_err(|e| Error::with_msg(format!("can not read response: {e:?}")))?;
|
||||||
|
let ret: ChannelConfigResponse = serde_json::from_slice(&buf)
|
||||||
|
.map_err(|e| Error::with_msg(format!("can not parse the channel config response json: {e:?}")))?;
|
||||||
|
Ok(ret)
|
||||||
|
} else {
|
||||||
|
let buf = hyper::body::to_bytes(res.into_body())
|
||||||
|
.await
|
||||||
|
.map_err(|e| Error::with_msg(format!("can not read response: {e:?}")))?;
|
||||||
|
match serde_json::from_slice::<PublicError>(&buf) {
|
||||||
|
Ok(e) => Err(e.into()),
|
||||||
|
Err(_) => Err(Error::with_msg(format!(
|
||||||
|
"can not parse the http error body: {:?}",
|
||||||
|
String::from_utf8_lossy(&buf)
|
||||||
|
))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,44 +0,0 @@
|
|||||||
use err::{Error, PublicError};
|
|
||||||
use hyper::{Body, Method};
|
|
||||||
use netpod::{AppendToUrl, ChannelConfigQuery, ChannelConfigResponse, NodeConfigCached};
|
|
||||||
use url::Url;
|
|
||||||
|
|
||||||
pub async fn get_channel_config(
|
|
||||||
q: &ChannelConfigQuery,
|
|
||||||
node_config: &NodeConfigCached,
|
|
||||||
) -> Result<ChannelConfigResponse, Error> {
|
|
||||||
let mut url = Url::parse(&format!(
|
|
||||||
"http://{}:{}/api/4/channel/config",
|
|
||||||
node_config.node.host, node_config.node.port
|
|
||||||
))?;
|
|
||||||
q.append_to_url(&mut url);
|
|
||||||
let req = hyper::Request::builder()
|
|
||||||
.method(Method::GET)
|
|
||||||
.uri(url.as_str())
|
|
||||||
.body(Body::empty())
|
|
||||||
.map_err(Error::from_string)?;
|
|
||||||
let client = hyper::Client::new();
|
|
||||||
let res = client
|
|
||||||
.request(req)
|
|
||||||
.await
|
|
||||||
.map_err(|e| Error::with_msg(format!("get_channel_config request error: {e:?}")))?;
|
|
||||||
if res.status().is_success() {
|
|
||||||
let buf = hyper::body::to_bytes(res.into_body())
|
|
||||||
.await
|
|
||||||
.map_err(|e| Error::with_msg(format!("can not read response: {e:?}")))?;
|
|
||||||
let ret: ChannelConfigResponse = serde_json::from_slice(&buf)
|
|
||||||
.map_err(|e| Error::with_msg(format!("can not parse the channel config response json: {e:?}")))?;
|
|
||||||
Ok(ret)
|
|
||||||
} else {
|
|
||||||
let buf = hyper::body::to_bytes(res.into_body())
|
|
||||||
.await
|
|
||||||
.map_err(|e| Error::with_msg(format!("can not read response: {e:?}")))?;
|
|
||||||
match serde_json::from_slice::<PublicError>(&buf) {
|
|
||||||
Ok(e) => Err(e.into()),
|
|
||||||
Err(_) => Err(Error::with_msg(format!(
|
|
||||||
"can not parse the http error body: {:?}",
|
|
||||||
String::from_utf8_lossy(&buf)
|
|
||||||
))),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
+15
-65
@@ -9,11 +9,13 @@ use hyper::{Body, Client, Request, Response};
|
|||||||
use items::eventfull::EventFull;
|
use items::eventfull::EventFull;
|
||||||
use items::{RangeCompletableItem, Sitemty, StreamItem};
|
use items::{RangeCompletableItem, Sitemty, StreamItem};
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
|
use netpod::log::*;
|
||||||
|
use netpod::query::api1::Api1Query;
|
||||||
use netpod::query::RawEventsQuery;
|
use netpod::query::RawEventsQuery;
|
||||||
use netpod::timeunits::SEC;
|
use netpod::timeunits::SEC;
|
||||||
use netpod::{log::*, DiskIoTune, ReadSys, ACCEPT_ALL};
|
use netpod::{ByteSize, Channel, DiskIoTune, NanoRange, NodeConfigCached, PerfOpts, Shape};
|
||||||
use netpod::{ByteSize, Channel, FileIoBufferSize, NanoRange, NodeConfigCached, PerfOpts, Shape, APP_OCTET};
|
use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig};
|
||||||
use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON};
|
use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET};
|
||||||
use parse::channelconfig::extract_matching_config_entry;
|
use parse::channelconfig::extract_matching_config_entry;
|
||||||
use parse::channelconfig::read_local_config;
|
use parse::channelconfig::read_local_config;
|
||||||
use parse::channelconfig::{Config, ConfigEntry, MatchingConfigEntry};
|
use parse::channelconfig::{Config, ConfigEntry, MatchingConfigEntry};
|
||||||
@@ -466,56 +468,6 @@ async fn process_answer(res: Response<Body>) -> Result<JsonValue, Error> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
|
||||||
pub struct Api1Range {
|
|
||||||
#[serde(rename = "startDate")]
|
|
||||||
start_date: String,
|
|
||||||
#[serde(rename = "endDate")]
|
|
||||||
end_date: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
|
||||||
pub struct Api1Query {
|
|
||||||
channels: Vec<String>,
|
|
||||||
range: Api1Range,
|
|
||||||
// All following parameters are private and not to be used
|
|
||||||
#[serde(default)]
|
|
||||||
file_io_buffer_size: Option<FileIoBufferSize>,
|
|
||||||
#[serde(default)]
|
|
||||||
decompress: bool,
|
|
||||||
#[serde(default = "u64_max", skip_serializing_if = "is_u64_max")]
|
|
||||||
events_max: u64,
|
|
||||||
#[serde(default)]
|
|
||||||
io_queue_len: u64,
|
|
||||||
#[serde(default)]
|
|
||||||
log_level: String,
|
|
||||||
#[serde(default)]
|
|
||||||
read_sys: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Api1Query {
|
|
||||||
pub fn disk_io_tune(&self) -> DiskIoTune {
|
|
||||||
let mut k = DiskIoTune::default();
|
|
||||||
if let Some(x) = &self.file_io_buffer_size {
|
|
||||||
k.read_buffer_len = x.0;
|
|
||||||
}
|
|
||||||
if self.io_queue_len != 0 {
|
|
||||||
k.read_queue_len = self.io_queue_len as usize;
|
|
||||||
}
|
|
||||||
let read_sys: ReadSys = self.read_sys.as_str().into();
|
|
||||||
k.read_sys = read_sys;
|
|
||||||
k
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn u64_max() -> u64 {
|
|
||||||
u64::MAX
|
|
||||||
}
|
|
||||||
|
|
||||||
fn is_u64_max(x: &u64) -> bool {
|
|
||||||
*x == u64::MAX
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct Api1ChannelHeader {
|
pub struct Api1ChannelHeader {
|
||||||
name: String,
|
name: String,
|
||||||
@@ -878,9 +830,9 @@ impl Api1EventsBinaryHandler {
|
|||||||
return Err(Error::with_msg_no_trace("can not parse query"));
|
return Err(Error::with_msg_no_trace("can not parse query"));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let span = if qu.log_level == "trace" {
|
let span = if qu.log_level() == "trace" {
|
||||||
tracing::span!(tracing::Level::TRACE, "log_span_t")
|
tracing::span!(tracing::Level::TRACE, "log_span_t")
|
||||||
} else if qu.log_level == "debug" {
|
} else if qu.log_level() == "debug" {
|
||||||
tracing::span!(tracing::Level::DEBUG, "log_span_d")
|
tracing::span!(tracing::Level::DEBUG, "log_span_d")
|
||||||
} else {
|
} else {
|
||||||
tracing::Span::none()
|
tracing::Span::none()
|
||||||
@@ -900,14 +852,12 @@ impl Api1EventsBinaryHandler {
|
|||||||
// TODO this should go to usage statistics:
|
// TODO this should go to usage statistics:
|
||||||
info!(
|
info!(
|
||||||
"Handle Api1Query {:?} {} {:?}",
|
"Handle Api1Query {:?} {} {:?}",
|
||||||
qu.range,
|
qu.range(),
|
||||||
qu.channels.len(),
|
qu.channels().len(),
|
||||||
qu.channels.first()
|
qu.channels().first()
|
||||||
);
|
);
|
||||||
let beg_date = chrono::DateTime::parse_from_rfc3339(&qu.range.start_date);
|
let beg_date = qu.range().beg().clone();
|
||||||
let end_date = chrono::DateTime::parse_from_rfc3339(&qu.range.end_date);
|
let end_date = qu.range().end().clone();
|
||||||
let beg_date = beg_date?;
|
|
||||||
let end_date = end_date?;
|
|
||||||
trace!("Api1Query beg_date {:?} end_date {:?}", beg_date, end_date);
|
trace!("Api1Query beg_date {:?} end_date {:?}", beg_date, end_date);
|
||||||
//let url = Url::parse(&format!("dummy:{}", req.uri()))?;
|
//let url = Url::parse(&format!("dummy:{}", req.uri()))?;
|
||||||
//let query = PlainEventsBinaryQuery::from_url(&url)?;
|
//let query = PlainEventsBinaryQuery::from_url(&url)?;
|
||||||
@@ -923,7 +873,7 @@ impl Api1EventsBinaryHandler {
|
|||||||
// TODO check for valid given backend name:
|
// TODO check for valid given backend name:
|
||||||
let backend = &node_config.node_config.cluster.backend;
|
let backend = &node_config.node_config.cluster.backend;
|
||||||
let chans = qu
|
let chans = qu
|
||||||
.channels
|
.channels()
|
||||||
.iter()
|
.iter()
|
||||||
.map(|x| Channel {
|
.map(|x| Channel {
|
||||||
backend: backend.into(),
|
backend: backend.into(),
|
||||||
@@ -937,8 +887,8 @@ impl Api1EventsBinaryHandler {
|
|||||||
range.clone(),
|
range.clone(),
|
||||||
chans,
|
chans,
|
||||||
qu.disk_io_tune().clone(),
|
qu.disk_io_tune().clone(),
|
||||||
qu.decompress,
|
qu.decompress(),
|
||||||
qu.events_max,
|
qu.events_max(),
|
||||||
status_id.clone(),
|
status_id.clone(),
|
||||||
node_config.clone(),
|
node_config.clone(),
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -1,22 +0,0 @@
|
|||||||
use serde::{Deserialize, Serialize};
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
|
||||||
pub struct Range {
|
|
||||||
#[serde(rename = "type")]
|
|
||||||
ty: String,
|
|
||||||
#[serde(rename = "startDate")]
|
|
||||||
beg: String,
|
|
||||||
#[serde(rename = "endDate")]
|
|
||||||
end: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO implement Deserialize such that I recognize the different possible formats...
|
|
||||||
// I guess, when serializing, it's ok to use the fully qualified format throughout.
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
|
||||||
pub struct ChannelList {}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
|
||||||
pub struct Query {
|
|
||||||
range: Range,
|
|
||||||
channels: ChannelList,
|
|
||||||
}
|
|
||||||
@@ -1,4 +1,3 @@
|
|||||||
pub mod api1;
|
|
||||||
pub mod histo;
|
pub mod histo;
|
||||||
pub mod query;
|
pub mod query;
|
||||||
pub mod status;
|
pub mod status;
|
||||||
|
|||||||
+4
-3
@@ -1,7 +1,8 @@
|
|||||||
use crate::{
|
pub mod api1;
|
||||||
get_url_query_pairs, AggKind, AppendToUrl, ByteSize, Channel, FromUrl, HasBackend, HasTimeout, NanoRange, ToNanos,
|
|
||||||
};
|
use crate::get_url_query_pairs;
|
||||||
use crate::{log::*, DiskIoTune};
|
use crate::{log::*, DiskIoTune};
|
||||||
|
use crate::{AggKind, AppendToUrl, ByteSize, Channel, FromUrl, HasBackend, HasTimeout, NanoRange, ToNanos};
|
||||||
use chrono::{DateTime, TimeZone, Utc};
|
use chrono::{DateTime, TimeZone, Utc};
|
||||||
use err::Error;
|
use err::Error;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|||||||
@@ -0,0 +1,196 @@
|
|||||||
|
use crate::{DiskIoTune, FileIoBufferSize, ReadSys};
|
||||||
|
use chrono::{DateTime, FixedOffset, NaiveDate, TimeZone};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
|
fn u64_max() -> u64 {
|
||||||
|
u64::MAX
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_u64_max(x: &u64) -> bool {
|
||||||
|
*x == u64::MAX
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct Api1Range {
|
||||||
|
#[serde(rename = "type", default, skip_serializing_if = "String::is_empty")]
|
||||||
|
ty: String,
|
||||||
|
#[serde(
|
||||||
|
rename = "startDate",
|
||||||
|
serialize_with = "datetime_serde::ser",
|
||||||
|
deserialize_with = "datetime_serde::de"
|
||||||
|
)]
|
||||||
|
beg: DateTime<FixedOffset>,
|
||||||
|
#[serde(
|
||||||
|
rename = "endDate",
|
||||||
|
serialize_with = "datetime_serde::ser",
|
||||||
|
deserialize_with = "datetime_serde::de"
|
||||||
|
)]
|
||||||
|
end: DateTime<FixedOffset>,
|
||||||
|
}
|
||||||
|
|
||||||
|
mod datetime_serde {
|
||||||
|
// RFC 3339 / ISO 8601
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
use serde::de::Visitor;
|
||||||
|
use serde::{Deserializer, Serializer};
|
||||||
|
|
||||||
|
pub fn ser<S: Serializer>(val: &DateTime<FixedOffset>, ser: S) -> Result<S::Ok, S::Error> {
|
||||||
|
let s = val.format("%Y-%m-%dT%H:%M:%S.%6f%:z").to_string();
|
||||||
|
ser.serialize_str(&s)
|
||||||
|
}
|
||||||
|
|
||||||
|
struct DateTimeVisitor {}
|
||||||
|
|
||||||
|
impl<'de> Visitor<'de> for DateTimeVisitor {
|
||||||
|
type Value = DateTime<FixedOffset>;
|
||||||
|
|
||||||
|
fn expecting(&self, fmt: &mut fmt::Formatter) -> std::fmt::Result {
|
||||||
|
write!(fmt, "DateTimeWithOffset")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn visit_str<E>(self, val: &str) -> Result<Self::Value, E>
|
||||||
|
where
|
||||||
|
E: serde::de::Error,
|
||||||
|
{
|
||||||
|
let res = DateTime::<FixedOffset>::parse_from_rfc3339(val);
|
||||||
|
match res {
|
||||||
|
Ok(res) => Ok(res),
|
||||||
|
// TODO deliver better fine grained error
|
||||||
|
Err(e) => Err(serde::de::Error::custom(format!("{e}"))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn de<'de, D>(de: D) -> Result<DateTime<FixedOffset>, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'de>,
|
||||||
|
{
|
||||||
|
de.deserialize_str(DateTimeVisitor {})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Api1Range {
|
||||||
|
pub fn new<A, B>(beg: A, end: B) -> Self
|
||||||
|
where
|
||||||
|
A: Into<DateTime<FixedOffset>>,
|
||||||
|
B: Into<DateTime<FixedOffset>>,
|
||||||
|
{
|
||||||
|
Self {
|
||||||
|
ty: String::new(),
|
||||||
|
beg: beg.into(),
|
||||||
|
end: end.into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn beg(&self) -> &DateTime<FixedOffset> {
|
||||||
|
&self.beg
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn end(&self) -> &DateTime<FixedOffset> {
|
||||||
|
&self.end
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn serde_de_range_zulu() {
|
||||||
|
let s = r#"{"startDate": "2022-11-22T10:15:12.412Z", "endDate": "2022-11-22T10:15:12.413556Z"}"#;
|
||||||
|
let range: Api1Range = serde_json::from_str(s).unwrap();
|
||||||
|
assert_eq!(range.beg().offset().local_minus_utc(), 0);
|
||||||
|
assert_eq!(range.end().offset().local_minus_utc(), 0);
|
||||||
|
assert_eq!(range.beg().timestamp_subsec_micros(), 412000);
|
||||||
|
assert_eq!(range.end().timestamp_subsec_micros(), 413556);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn serde_de_range_offset() {
|
||||||
|
let s = r#"{"startDate": "2022-11-22T10:15:12.412Z", "endDate": "2022-11-22T10:15:12.413556Z"}"#;
|
||||||
|
let range: Api1Range = serde_json::from_str(s).unwrap();
|
||||||
|
assert_eq!(range.beg().offset().local_minus_utc(), 0);
|
||||||
|
assert_eq!(range.end().offset().local_minus_utc(), 0);
|
||||||
|
assert_eq!(range.beg().timestamp_subsec_micros(), 412000);
|
||||||
|
assert_eq!(range.end().timestamp_subsec_micros(), 413556);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn serde_ser_range_offset() {
|
||||||
|
let beg = FixedOffset::east_opt(60 * 60 * 3)
|
||||||
|
.unwrap()
|
||||||
|
.from_local_datetime(
|
||||||
|
&NaiveDate::from_ymd_opt(2022, 11, 22)
|
||||||
|
.unwrap()
|
||||||
|
.and_hms_milli_opt(13, 14, 15, 16)
|
||||||
|
.unwrap(),
|
||||||
|
)
|
||||||
|
.earliest()
|
||||||
|
.unwrap();
|
||||||
|
let end = FixedOffset::east_opt(-60 * 60 * 1)
|
||||||
|
.unwrap()
|
||||||
|
.from_local_datetime(
|
||||||
|
&NaiveDate::from_ymd_opt(2022, 11, 22)
|
||||||
|
.unwrap()
|
||||||
|
.and_hms_milli_opt(13, 14, 15, 800)
|
||||||
|
.unwrap(),
|
||||||
|
)
|
||||||
|
.earliest()
|
||||||
|
.unwrap();
|
||||||
|
let range = Api1Range::new(beg, end);
|
||||||
|
let js = serde_json::to_string(&range).unwrap();
|
||||||
|
let exp = r#"{"startDate":"2022-11-22T13:14:15.016000+03:00","endDate":"2022-11-22T13:14:15.800000-01:00"}"#;
|
||||||
|
assert_eq!(js, exp);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct Api1Query {
|
||||||
|
channels: Vec<String>,
|
||||||
|
range: Api1Range,
|
||||||
|
// All following parameters are private and not to be used
|
||||||
|
#[serde(default)]
|
||||||
|
file_io_buffer_size: Option<FileIoBufferSize>,
|
||||||
|
#[serde(default)]
|
||||||
|
decompress: bool,
|
||||||
|
#[serde(default = "u64_max", skip_serializing_if = "is_u64_max")]
|
||||||
|
events_max: u64,
|
||||||
|
#[serde(default)]
|
||||||
|
io_queue_len: u64,
|
||||||
|
#[serde(default)]
|
||||||
|
log_level: String,
|
||||||
|
#[serde(default)]
|
||||||
|
read_sys: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Api1Query {
|
||||||
|
pub fn disk_io_tune(&self) -> DiskIoTune {
|
||||||
|
let mut k = DiskIoTune::default();
|
||||||
|
if let Some(x) = &self.file_io_buffer_size {
|
||||||
|
k.read_buffer_len = x.0;
|
||||||
|
}
|
||||||
|
if self.io_queue_len != 0 {
|
||||||
|
k.read_queue_len = self.io_queue_len as usize;
|
||||||
|
}
|
||||||
|
let read_sys: ReadSys = self.read_sys.as_str().into();
|
||||||
|
k.read_sys = read_sys;
|
||||||
|
k
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn range(&self) -> &Api1Range {
|
||||||
|
&self.range
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn channels(&self) -> &[String] {
|
||||||
|
&self.channels
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn log_level(&self) -> &str {
|
||||||
|
&self.log_level
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn decompress(&self) -> bool {
|
||||||
|
self.decompress
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn events_max(&self) -> u64 {
|
||||||
|
self.events_max
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -70,9 +70,9 @@ pub fn get_runtime_opts(nworkers: usize, nblocking: usize) -> Arc<Runtime> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run<T, F>(f: F) -> Result<T, Error>
|
pub fn run<T, F>(fut: F) -> Result<T, Error>
|
||||||
where
|
where
|
||||||
F: std::future::Future<Output = Result<T, Error>>,
|
F: Future<Output = Result<T, Error>>,
|
||||||
{
|
{
|
||||||
let runtime = get_runtime();
|
let runtime = get_runtime();
|
||||||
match tracing_init() {
|
match tracing_init() {
|
||||||
@@ -81,7 +81,7 @@ where
|
|||||||
eprintln!("TRACING: {e:?}");
|
eprintln!("TRACING: {e:?}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let res = runtime.block_on(async { f.await });
|
let res = runtime.block_on(async { fut.await });
|
||||||
match res {
|
match res {
|
||||||
Ok(k) => Ok(k),
|
Ok(k) => Ok(k),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|||||||
Reference in New Issue
Block a user