factor out netpod
This commit is contained in:
@@ -18,7 +18,7 @@ async-channel = "1.9.0"
|
||||
parking_lot = "0.12"
|
||||
crc32fast = "1.2"
|
||||
daqbuf-err = { path = "../../../daqbuf-err" }
|
||||
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
|
||||
taskrun = { path = "../taskrun" }
|
||||
netpod = { path = "../netpod" }
|
||||
items_0 = { path = "../items_0" }
|
||||
items_proc = { path = "../items_proc" }
|
||||
|
||||
@@ -16,8 +16,11 @@ url = "2.5.0"
|
||||
clap = { version = "4.5.7", features = ["derive", "cargo"] }
|
||||
daqbuf-err = { path = "../../../daqbuf-err" }
|
||||
taskrun = { path = "../taskrun" }
|
||||
netpod = { path = "../netpod" }
|
||||
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
|
||||
disk = { path = "../disk" }
|
||||
httpclient = { path = "../httpclient" }
|
||||
streams = { path = "../streams" }
|
||||
daqbufp2 = { path = "../daqbufp2" }
|
||||
|
||||
[features]
|
||||
DISABLED = []
|
||||
|
||||
@@ -169,7 +169,7 @@ async fn test_log() {
|
||||
// TODO use httpclient for the request: need to add binary POST.
|
||||
//#[test]
|
||||
#[allow(unused)]
|
||||
#[cfg(DISABLED)]
|
||||
#[cfg(feature = "DISABLED")]
|
||||
fn simple_fetch() {
|
||||
use daqbuffer::err::ErrConv;
|
||||
use netpod::timeunits::*;
|
||||
|
||||
@@ -23,7 +23,7 @@ url = "2.2.2"
|
||||
lazy_static = "1.4.0"
|
||||
daqbuf-err = { path = "../../../daqbuf-err" }
|
||||
taskrun = { path = "../taskrun" }
|
||||
netpod = { path = "../netpod" }
|
||||
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
|
||||
query = { path = "../query" }
|
||||
httpret = { path = "../httpret" }
|
||||
httpclient = { path = "../httpclient" }
|
||||
|
||||
@@ -21,6 +21,6 @@ async-channel = "1.9.0"
|
||||
chrono = "0.4.38"
|
||||
regex = "1.10.4"
|
||||
daqbuf-err = { path = "../../../daqbuf-err" }
|
||||
netpod = { path = "../netpod" }
|
||||
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
|
||||
parse = { path = "../parse" }
|
||||
taskrun = { path = "../taskrun" }
|
||||
|
||||
@@ -32,7 +32,7 @@ url = "2.5.0"
|
||||
tiny-keccak = { version = "2.0", features = ["sha3"] }
|
||||
daqbuf-err = { path = "../../../daqbuf-err" }
|
||||
taskrun = { path = "../taskrun" }
|
||||
netpod = { path = "../netpod" }
|
||||
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
|
||||
query = { path = "../query" }
|
||||
dbconn = { path = "../dbconn" }
|
||||
parse = { path = "../parse" }
|
||||
|
||||
@@ -15,7 +15,7 @@ chrono = "0.4.19"
|
||||
bytes = "1.7"
|
||||
daqbuf-err = { path = "../../../daqbuf-err" }
|
||||
taskrun = { path = "../taskrun" }
|
||||
netpod = { path = "../netpod" }
|
||||
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
|
||||
parse = { path = "../parse" }
|
||||
disk = { path = "../disk" }
|
||||
streams = { path = "../streams" }
|
||||
|
||||
@@ -20,7 +20,7 @@ hyper-util = { version = "0.1.1", features = ["full"] }
|
||||
bytes = "1.5.0"
|
||||
async-channel = "1.9.0"
|
||||
daqbuf-err = { path = "../../../daqbuf-err" }
|
||||
netpod = { path = "../netpod" }
|
||||
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
|
||||
parse = { path = "../parse" }
|
||||
streams = { path = "../streams" }
|
||||
thiserror = "0.0.1"
|
||||
|
||||
@@ -29,7 +29,7 @@ ciborium = "0.2.1"
|
||||
flate2 = "1"
|
||||
brotli = "3.4.0"
|
||||
daqbuf-err = { path = "../../../daqbuf-err" }
|
||||
netpod = { path = "../netpod" }
|
||||
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
|
||||
query = { path = "../query" }
|
||||
dbconn = { path = "../dbconn" }
|
||||
disk = { path = "../disk" }
|
||||
|
||||
@@ -16,5 +16,5 @@ bincode = "1.3.3"
|
||||
bytes = "1.2.1"
|
||||
futures-util = "0.3.24"
|
||||
chrono = { version = "0.4.19", features = ["serde"] }
|
||||
netpod = { path = "../netpod" }
|
||||
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
|
||||
daqbuf-err = { path = "../../../daqbuf-err" }
|
||||
|
||||
@@ -26,7 +26,7 @@ thiserror = "0.0.1"
|
||||
daqbuf-err = { path = "../../../daqbuf-err" }
|
||||
items_0 = { path = "../items_0" }
|
||||
items_proc = { path = "../items_proc" }
|
||||
netpod = { path = "../netpod" }
|
||||
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
|
||||
parse = { path = "../parse" }
|
||||
bitshuffle = { path = "../bitshuffle" }
|
||||
|
||||
|
||||
@@ -1,28 +0,0 @@
|
||||
[package]
|
||||
name = "netpod"
|
||||
version = "0.0.2"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2021"
|
||||
|
||||
[lib]
|
||||
path = "src/netpod.rs"
|
||||
|
||||
[dependencies]
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
http = "1.0.0"
|
||||
humantime = "2.1.0"
|
||||
humantime-serde = "1.1.1"
|
||||
bytes = "1.4.0"
|
||||
chrono = { version = "0.4.19", features = ["serde"] }
|
||||
futures-util = "0.3.14"
|
||||
tracing = "0.1.37"
|
||||
url = "2.5.0"
|
||||
num-traits = "0.2.16"
|
||||
hex = "0.4.3"
|
||||
rand = "0.8.5"
|
||||
thiserror = "0.0.1"
|
||||
daqbuf-err = { path = "../../../daqbuf-err" }
|
||||
|
||||
[patch.crates-io]
|
||||
thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" }
|
||||
@@ -1,111 +0,0 @@
|
||||
use daqbuf_err as err;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ChannelStatusClosedReason {
|
||||
ShutdownCommand,
|
||||
ChannelRemove,
|
||||
ProtocolError,
|
||||
FrequencyQuota,
|
||||
BandwidthQuota,
|
||||
InternalError,
|
||||
IocTimeout,
|
||||
NoProtocol,
|
||||
ProtocolDone,
|
||||
ConnectFail,
|
||||
IoError,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ChannelStatus {
|
||||
AssignedToAddress,
|
||||
Opened,
|
||||
Closed(ChannelStatusClosedReason),
|
||||
Pong,
|
||||
MonitoringSilenceReadStart,
|
||||
MonitoringSilenceReadTimeout,
|
||||
MonitoringSilenceReadUnchanged,
|
||||
HaveStatusId,
|
||||
HaveAddress,
|
||||
}
|
||||
|
||||
impl ChannelStatus {
|
||||
pub fn to_kind(&self) -> u32 {
|
||||
use ChannelStatus::*;
|
||||
use ChannelStatusClosedReason::*;
|
||||
match self {
|
||||
AssignedToAddress => 24,
|
||||
Opened => 1,
|
||||
Closed(x) => match x {
|
||||
ShutdownCommand => 2,
|
||||
ChannelRemove => 3,
|
||||
ProtocolError => 4,
|
||||
FrequencyQuota => 5,
|
||||
BandwidthQuota => 6,
|
||||
InternalError => 7,
|
||||
IocTimeout => 8,
|
||||
NoProtocol => 9,
|
||||
ProtocolDone => 10,
|
||||
ConnectFail => 11,
|
||||
IoError => 12,
|
||||
},
|
||||
Pong => 25,
|
||||
MonitoringSilenceReadStart => 26,
|
||||
MonitoringSilenceReadTimeout => 27,
|
||||
MonitoringSilenceReadUnchanged => 28,
|
||||
HaveStatusId => 29,
|
||||
HaveAddress => 30,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_kind(kind: u32) -> Result<Self, err::Error> {
|
||||
use ChannelStatus::*;
|
||||
use ChannelStatusClosedReason::*;
|
||||
let ret = match kind {
|
||||
1 => Opened,
|
||||
2 => Closed(ShutdownCommand),
|
||||
3 => Closed(ChannelRemove),
|
||||
4 => Closed(ProtocolError),
|
||||
5 => Closed(FrequencyQuota),
|
||||
6 => Closed(BandwidthQuota),
|
||||
7 => Closed(InternalError),
|
||||
8 => Closed(IocTimeout),
|
||||
9 => Closed(NoProtocol),
|
||||
10 => Closed(ProtocolDone),
|
||||
11 => Closed(ConnectFail),
|
||||
12 => Closed(IoError),
|
||||
24 => AssignedToAddress,
|
||||
25 => Pong,
|
||||
26 => MonitoringSilenceReadStart,
|
||||
27 => MonitoringSilenceReadTimeout,
|
||||
28 => MonitoringSilenceReadUnchanged,
|
||||
29 => HaveStatusId,
|
||||
30 => HaveAddress,
|
||||
_ => {
|
||||
return Err(err::Error::with_msg_no_trace(format!(
|
||||
"unknown ChannelStatus kind {kind}"
|
||||
)));
|
||||
}
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub fn to_u64(&self) -> u64 {
|
||||
self.to_kind() as u64
|
||||
}
|
||||
|
||||
pub fn to_user_variant_string(&self) -> String {
|
||||
use ChannelStatus::*;
|
||||
let ret = match self {
|
||||
AssignedToAddress => "Located",
|
||||
Opened => "Opened",
|
||||
Closed(_) => "Closed",
|
||||
Pong => "Pongg",
|
||||
MonitoringSilenceReadStart => "MSRS",
|
||||
MonitoringSilenceReadTimeout => "MSRT",
|
||||
MonitoringSilenceReadUnchanged => "MSRU",
|
||||
HaveStatusId => "HaveStatusId",
|
||||
HaveAddress => "HaveAddress",
|
||||
};
|
||||
ret.into()
|
||||
}
|
||||
}
|
||||
@@ -1,10 +0,0 @@
|
||||
/// Input may also contain whitespace.
|
||||
pub fn decode_hex<INP: AsRef<str>>(inp: INP) -> Result<Vec<u8>, ()> {
|
||||
let a: Vec<_> = inp
|
||||
.as_ref()
|
||||
.bytes()
|
||||
.filter(|&x| (x >= b'0' && x <= b'9') || (x >= b'a' && x <= b'f'))
|
||||
.collect();
|
||||
let ret = hex::decode(a).map_err(|_| ())?;
|
||||
Ok(ret)
|
||||
}
|
||||
@@ -1,30 +0,0 @@
|
||||
#[derive(Debug)]
|
||||
pub struct HistoLog2 {
|
||||
histo: [u64; 16],
|
||||
sub: usize,
|
||||
}
|
||||
|
||||
impl HistoLog2 {
|
||||
pub fn new(sub: usize) -> Self {
|
||||
Self { histo: [0; 16], sub }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn ingest(&mut self, mut v: u32) {
|
||||
let mut po = 0;
|
||||
while v != 0 && po < 15 {
|
||||
v = v >> 1;
|
||||
po += 1;
|
||||
}
|
||||
let po = if po >= self.histo.len() + self.sub {
|
||||
self.histo.len() - 1
|
||||
} else {
|
||||
if po > self.sub {
|
||||
po - self.sub
|
||||
} else {
|
||||
0
|
||||
}
|
||||
};
|
||||
self.histo[po] += 1;
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,376 +0,0 @@
|
||||
pub mod api1;
|
||||
pub mod datetime;
|
||||
pub mod prebinned;
|
||||
|
||||
use daqbuf_err as err;
|
||||
|
||||
use crate::get_url_query_pairs;
|
||||
use crate::log::*;
|
||||
use crate::AggKind;
|
||||
use crate::AppendToUrl;
|
||||
use crate::FromUrl;
|
||||
use crate::HasBackend;
|
||||
use crate::HasTimeout;
|
||||
use crate::NanoRange;
|
||||
use crate::NetpodError;
|
||||
use crate::PulseRange;
|
||||
use crate::SfDbChannel;
|
||||
use crate::ToNanos;
|
||||
use crate::DATETIME_FMT_6MS;
|
||||
use chrono::DateTime;
|
||||
use chrono::TimeZone;
|
||||
use chrono::Utc;
|
||||
use err::Error;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
use std::time::Duration;
|
||||
use url::Url;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum CacheUsage {
|
||||
Use,
|
||||
Ignore,
|
||||
Recreate,
|
||||
V0NoCache,
|
||||
}
|
||||
|
||||
impl CacheUsage {
|
||||
pub fn query_param_value(&self) -> String {
|
||||
match self {
|
||||
CacheUsage::Use => "use",
|
||||
CacheUsage::Ignore => "ignore",
|
||||
CacheUsage::Recreate => "recreate",
|
||||
CacheUsage::V0NoCache => "v0nocache",
|
||||
}
|
||||
.into()
|
||||
}
|
||||
|
||||
// Missing query parameter is not an error
|
||||
pub fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Option<Self>, NetpodError> {
|
||||
pairs
|
||||
.get("cacheUsage")
|
||||
.map(|k| {
|
||||
if k == "use" {
|
||||
Ok(Some(CacheUsage::Use))
|
||||
} else if k == "ignore" {
|
||||
Ok(Some(CacheUsage::Ignore))
|
||||
} else if k == "recreate" {
|
||||
Ok(Some(CacheUsage::Recreate))
|
||||
} else if k == "v0nocache" {
|
||||
Ok(Some(CacheUsage::V0NoCache))
|
||||
} else {
|
||||
Err(NetpodError::BadCacheUsage(k.clone()))?
|
||||
}
|
||||
})
|
||||
.unwrap_or(Ok(None))
|
||||
}
|
||||
|
||||
pub fn from_string(s: &str) -> Result<Self, Error> {
|
||||
let ret = if s == "ignore" {
|
||||
CacheUsage::Ignore
|
||||
} else if s == "recreate" {
|
||||
CacheUsage::Recreate
|
||||
} else if s == "use" {
|
||||
CacheUsage::Use
|
||||
} else if s == "v0nocache" {
|
||||
CacheUsage::V0NoCache
|
||||
} else {
|
||||
return Err(Error::with_msg(format!("can not interpret cache usage string: {}", s)));
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub fn is_cache_write(&self) -> bool {
|
||||
match self {
|
||||
CacheUsage::Use => true,
|
||||
CacheUsage::Ignore => false,
|
||||
CacheUsage::Recreate => true,
|
||||
CacheUsage::V0NoCache => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_cache_read(&self) -> bool {
|
||||
match self {
|
||||
CacheUsage::Use => true,
|
||||
CacheUsage::Ignore => false,
|
||||
CacheUsage::Recreate => false,
|
||||
CacheUsage::V0NoCache => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for CacheUsage {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(fmt, "{}", self.query_param_value())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct TimeRangeQuery {
|
||||
range: NanoRange,
|
||||
}
|
||||
|
||||
fn parse_time(v: &str) -> Result<DateTime<Utc>, NetpodError> {
|
||||
if let Ok(x) = v.parse() {
|
||||
Ok(x)
|
||||
} else {
|
||||
if v.ends_with("ago") {
|
||||
let d = humantime::parse_duration(&v[..v.len() - 3]).map_err(|_| NetpodError::BadTimerange)?;
|
||||
Ok(Utc::now() - d)
|
||||
} else {
|
||||
Err(NetpodError::BadTimerange)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FromUrl for TimeRangeQuery {
|
||||
type Error = NetpodError;
|
||||
|
||||
fn from_url(url: &Url) -> Result<Self, Self::Error> {
|
||||
let pairs = get_url_query_pairs(url);
|
||||
Self::from_pairs(&pairs)
|
||||
}
|
||||
|
||||
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Self::Error> {
|
||||
if let (Some(beg), Some(end)) = (pairs.get("begDate"), pairs.get("endDate")) {
|
||||
let ret = Self {
|
||||
range: NanoRange {
|
||||
beg: parse_time(beg)?.to_nanos(),
|
||||
end: parse_time(end)?.to_nanos(),
|
||||
},
|
||||
};
|
||||
Ok(ret)
|
||||
} else if let (Some(beg), Some(end)) = (pairs.get("begNs"), pairs.get("endNs")) {
|
||||
let ret = Self {
|
||||
range: NanoRange {
|
||||
beg: beg.parse()?,
|
||||
end: end.parse()?,
|
||||
},
|
||||
};
|
||||
Ok(ret)
|
||||
} else {
|
||||
Err(NetpodError::MissingTimerange)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AppendToUrl for TimeRangeQuery {
|
||||
fn append_to_url(&self, url: &mut Url) {
|
||||
let date_fmt = DATETIME_FMT_6MS;
|
||||
let mut g = url.query_pairs_mut();
|
||||
g.append_pair(
|
||||
"begDate",
|
||||
&Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(),
|
||||
);
|
||||
g.append_pair(
|
||||
"endDate",
|
||||
&Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TimeRangeQuery> for NanoRange {
|
||||
fn from(k: TimeRangeQuery) -> Self {
|
||||
Self {
|
||||
beg: k.range.beg,
|
||||
end: k.range.end,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&NanoRange> for TimeRangeQuery {
|
||||
fn from(k: &NanoRange) -> Self {
|
||||
Self {
|
||||
range: NanoRange { beg: k.beg, end: k.end },
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&PulseRange> for PulseRangeQuery {
|
||||
fn from(k: &PulseRange) -> Self {
|
||||
Self {
|
||||
range: PulseRange { beg: k.beg, end: k.end },
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct PulseRangeQuery {
|
||||
range: PulseRange,
|
||||
}
|
||||
|
||||
impl FromUrl for PulseRangeQuery {
|
||||
type Error = NetpodError;
|
||||
|
||||
fn from_url(url: &Url) -> Result<Self, Self::Error> {
|
||||
let pairs = get_url_query_pairs(url);
|
||||
Self::from_pairs(&pairs)
|
||||
}
|
||||
|
||||
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Self::Error> {
|
||||
if let (Some(beg), Some(end)) = (pairs.get("begPulse"), pairs.get("endPulse")) {
|
||||
let ret = Self {
|
||||
range: PulseRange {
|
||||
beg: beg.parse()?,
|
||||
end: end.parse()?,
|
||||
},
|
||||
};
|
||||
Ok(ret)
|
||||
} else {
|
||||
Err(NetpodError::MissingQueryParameters)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AppendToUrl for PulseRangeQuery {
|
||||
fn append_to_url(&self, url: &mut Url) {
|
||||
let mut g = url.query_pairs_mut();
|
||||
g.append_pair("begPulse", &self.range.beg.to_string());
|
||||
g.append_pair("endPulse", &self.range.end.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PulseRangeQuery> for PulseRange {
|
||||
fn from(k: PulseRangeQuery) -> Self {
|
||||
Self {
|
||||
beg: k.range.beg,
|
||||
end: k.range.end,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn binning_scheme_append_to_url(agg_kind: &AggKind, url: &mut Url) {
|
||||
let mut g = url.query_pairs_mut();
|
||||
match agg_kind {
|
||||
AggKind::EventBlobs => {
|
||||
g.append_pair("binningScheme", "eventBlobs");
|
||||
}
|
||||
AggKind::TimeWeightedScalar => {
|
||||
g.append_pair("binningScheme", "timeWeightedScalar");
|
||||
}
|
||||
AggKind::Plain => {
|
||||
g.append_pair("binningScheme", "fullValue");
|
||||
}
|
||||
AggKind::DimXBins1 => {
|
||||
g.append_pair("binningScheme", "unweightedScalar");
|
||||
}
|
||||
AggKind::DimXBinsN(n) => {
|
||||
g.append_pair("binningScheme", "binnedX");
|
||||
g.append_pair("binnedXcount", &format!("{}", n));
|
||||
}
|
||||
AggKind::PulseIdDiff => {
|
||||
g.append_pair("binningScheme", "pulseIdDiff");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Absent AggKind is not considered an error.
|
||||
pub fn agg_kind_from_binning_scheme(pairs: &BTreeMap<String, String>) -> Result<Option<AggKind>, NetpodError> {
|
||||
let key = "binningScheme";
|
||||
if let Some(s) = pairs.get(key) {
|
||||
let ret = if s == "eventBlobs" {
|
||||
AggKind::EventBlobs
|
||||
} else if s == "fullValue" {
|
||||
AggKind::Plain
|
||||
} else if s == "timeWeightedScalar" {
|
||||
AggKind::TimeWeightedScalar
|
||||
} else if s == "unweightedScalar" {
|
||||
AggKind::DimXBins1
|
||||
} else if s == "binnedX" {
|
||||
let u = pairs.get("binnedXcount").map_or("1", |k| k).parse()?;
|
||||
AggKind::DimXBinsN(u)
|
||||
} else if s == "pulseIdDiff" {
|
||||
AggKind::PulseIdDiff
|
||||
} else {
|
||||
return Err(NetpodError::MissingBinningScheme);
|
||||
};
|
||||
Ok(Some(ret))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ChannelStateEventsQuery {
|
||||
channel: SfDbChannel,
|
||||
range: NanoRange,
|
||||
}
|
||||
|
||||
impl ChannelStateEventsQuery {
|
||||
pub fn new(channel: SfDbChannel, range: NanoRange) -> Self {
|
||||
Self { channel, range }
|
||||
}
|
||||
|
||||
pub fn range(&self) -> &NanoRange {
|
||||
&self.range
|
||||
}
|
||||
|
||||
pub fn channel(&self) -> &SfDbChannel {
|
||||
&self.channel
|
||||
}
|
||||
|
||||
pub fn set_series_id(&mut self, series: u64) {
|
||||
self.channel.series = Some(series);
|
||||
}
|
||||
|
||||
pub fn channel_mut(&mut self) -> &mut SfDbChannel {
|
||||
&mut self.channel
|
||||
}
|
||||
}
|
||||
|
||||
impl HasBackend for ChannelStateEventsQuery {
|
||||
fn backend(&self) -> &str {
|
||||
&self.channel.backend
|
||||
}
|
||||
}
|
||||
|
||||
impl HasTimeout for ChannelStateEventsQuery {
|
||||
fn timeout(&self) -> Option<Duration> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl FromUrl for ChannelStateEventsQuery {
|
||||
type Error = NetpodError;
|
||||
|
||||
fn from_url(url: &Url) -> Result<Self, Self::Error> {
|
||||
let pairs = get_url_query_pairs(url);
|
||||
Self::from_pairs(&pairs)
|
||||
}
|
||||
|
||||
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Self::Error> {
|
||||
let beg_date = pairs.get("begDate").ok_or_else(|| NetpodError::MissingTimerange)?;
|
||||
let end_date = pairs.get("endDate").ok_or_else(|| NetpodError::MissingTimerange)?;
|
||||
let ret = Self {
|
||||
channel: SfDbChannel::from_pairs(&pairs)?,
|
||||
range: NanoRange {
|
||||
beg: beg_date.parse::<DateTime<Utc>>()?.to_nanos(),
|
||||
end: end_date.parse::<DateTime<Utc>>()?.to_nanos(),
|
||||
},
|
||||
};
|
||||
let self_name = std::any::type_name::<Self>();
|
||||
debug!("{self_name}::from_url {ret:?}");
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
impl AppendToUrl for ChannelStateEventsQuery {
|
||||
fn append_to_url(&self, url: &mut Url) {
|
||||
self.channel.append_to_url(url);
|
||||
let mut g = url.query_pairs_mut();
|
||||
g.append_pair(
|
||||
"begDate",
|
||||
&Utc.timestamp_nanos(self.range.beg as i64)
|
||||
.format(DATETIME_FMT_6MS)
|
||||
.to_string(),
|
||||
);
|
||||
g.append_pair(
|
||||
"endDate",
|
||||
&Utc.timestamp_nanos(self.range.end as i64)
|
||||
.format(DATETIME_FMT_6MS)
|
||||
.to_string(),
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1,317 +0,0 @@
|
||||
use crate::query::datetime::Datetime;
|
||||
use crate::DiskIoTune;
|
||||
use crate::FileIoBufferSize;
|
||||
use crate::ReadSys;
|
||||
use daqbuf_err as err;
|
||||
use err::Error;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::fmt;
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct Api1Range {
|
||||
#[serde(rename = "type", default, skip_serializing_if = "String::is_empty")]
|
||||
ty: String,
|
||||
#[serde(rename = "startDate")]
|
||||
beg: Datetime,
|
||||
#[serde(rename = "endDate")]
|
||||
end: Datetime,
|
||||
}
|
||||
|
||||
impl Api1Range {
|
||||
pub fn new(beg: Datetime, end: Datetime) -> Result<Self, Error> {
|
||||
let ret = Self {
|
||||
ty: String::new(),
|
||||
beg,
|
||||
end,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub fn beg(&self) -> &Datetime {
|
||||
&self.beg
|
||||
}
|
||||
|
||||
pub fn end(&self) -> &Datetime {
|
||||
&self.end
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_de_range_zulu() {
|
||||
let s = r#"{"startDate": "2022-11-22T10:15:12.412Z", "endDate": "2022-11-22T10:15:12.413556Z"}"#;
|
||||
let range: Api1Range = serde_json::from_str(s).unwrap();
|
||||
assert_eq!(range.beg().offset().local_minus_utc(), 0);
|
||||
assert_eq!(range.end().offset().local_minus_utc(), 0);
|
||||
assert_eq!(range.beg().timestamp_subsec_micros(), 412000);
|
||||
assert_eq!(range.end().timestamp_subsec_micros(), 413556);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_de_range_offset() {
|
||||
let s = r#"{"startDate": "2022-11-22T10:15:12.412Z", "endDate": "2022-11-22T10:15:12.413556Z"}"#;
|
||||
let range: Api1Range = serde_json::from_str(s).unwrap();
|
||||
assert_eq!(range.beg().offset().local_minus_utc(), 0);
|
||||
assert_eq!(range.end().offset().local_minus_utc(), 0);
|
||||
assert_eq!(range.beg().timestamp_subsec_micros(), 412000);
|
||||
assert_eq!(range.end().timestamp_subsec_micros(), 413556);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_ser_range_offset() {
|
||||
use chrono::{FixedOffset, NaiveDate, TimeZone};
|
||||
let beg = FixedOffset::east_opt(60 * 60 * 3)
|
||||
.unwrap()
|
||||
.from_local_datetime(
|
||||
&NaiveDate::from_ymd_opt(2022, 11, 22)
|
||||
.unwrap()
|
||||
.and_hms_milli_opt(13, 14, 15, 16)
|
||||
.unwrap(),
|
||||
)
|
||||
.earliest()
|
||||
.unwrap();
|
||||
let end = FixedOffset::east_opt(-60 * 60 * 1)
|
||||
.unwrap()
|
||||
.from_local_datetime(
|
||||
&NaiveDate::from_ymd_opt(2022, 11, 22)
|
||||
.unwrap()
|
||||
.and_hms_milli_opt(13, 14, 15, 800)
|
||||
.unwrap(),
|
||||
)
|
||||
.earliest()
|
||||
.unwrap();
|
||||
let range = Api1Range::new(beg.into(), end.into()).unwrap();
|
||||
let js = serde_json::to_string(&range).unwrap();
|
||||
let exp = r#"{"startDate":"2022-11-22T13:14:15.016+03:00","endDate":"2022-11-22T13:14:15.800-01:00"}"#;
|
||||
assert_eq!(js, exp);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_ser_range_01() -> Result<(), Error> {
|
||||
let beg = Datetime::try_from("2022-11-22T02:03:04Z")?;
|
||||
let end = Datetime::try_from("2022-11-22T02:03:04.123Z")?;
|
||||
let range = Api1Range::new(beg, end).unwrap();
|
||||
let js = serde_json::to_string(&range).unwrap();
|
||||
let exp = r#"{"startDate":"2022-11-22T02:03:04Z","endDate":"2022-11-22T02:03:04.123Z"}"#;
|
||||
assert_eq!(js, exp);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_ser_range_02() -> Result<(), Error> {
|
||||
let beg = Datetime::try_from("2022-11-22T02:03:04.987654Z")?;
|
||||
let end = Datetime::try_from("2022-11-22T02:03:04.777000Z")?;
|
||||
let range = Api1Range::new(beg, end).unwrap();
|
||||
let js = serde_json::to_string(&range).unwrap();
|
||||
let exp = r#"{"startDate":"2022-11-22T02:03:04.987654Z","endDate":"2022-11-22T02:03:04.777Z"}"#;
|
||||
assert_eq!(js, exp);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// In Api1, the list of channels consists of either `BACKEND/CHANNELNAME`
|
||||
/// or just `CHANNELNAME`.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct ChannelTuple {
|
||||
backend: Option<String>,
|
||||
name: String,
|
||||
}
|
||||
|
||||
impl ChannelTuple {
|
||||
pub fn new(backend: String, name: String) -> Self {
|
||||
Self {
|
||||
backend: Some(backend),
|
||||
name,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_name(name: String) -> Self {
|
||||
Self { backend: None, name }
|
||||
}
|
||||
|
||||
pub fn backend(&self) -> Option<&String> {
|
||||
self.backend.as_ref()
|
||||
}
|
||||
|
||||
pub fn name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
}
|
||||
|
||||
mod serde_channel_tuple {
|
||||
use super::*;
|
||||
use serde::de::{Deserialize, Deserializer, Visitor};
|
||||
use serde::ser::{Serialize, Serializer};
|
||||
|
||||
impl Serialize for ChannelTuple {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
if let Some(backend) = self.backend.as_ref() {
|
||||
serializer.serialize_str(&format!("{}/{}", backend, self.name))
|
||||
} else {
|
||||
serializer.serialize_str(&self.name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct Vis;
|
||||
|
||||
impl<'de> Visitor<'de> for Vis {
|
||||
type Value = ChannelTuple;
|
||||
|
||||
fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(fmt, "[Backendname/]Channelname")
|
||||
}
|
||||
|
||||
fn visit_str<E>(self, val: &str) -> Result<Self::Value, E>
|
||||
where
|
||||
E: serde::de::Error,
|
||||
{
|
||||
let mut it = val.split("/");
|
||||
// Even empty string splits into one element of empty string
|
||||
let s0 = it.next().unwrap();
|
||||
if let Some(s1) = it.next() {
|
||||
let ret = ChannelTuple {
|
||||
backend: Some(s0.into()),
|
||||
name: s1.into(),
|
||||
};
|
||||
Ok(ret)
|
||||
} else {
|
||||
let ret = ChannelTuple {
|
||||
backend: None,
|
||||
name: s0.into(),
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for ChannelTuple {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
deserializer.deserialize_str(Vis)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ser_name() {
|
||||
let x = ChannelTuple {
|
||||
backend: None,
|
||||
name: "temperature".into(),
|
||||
};
|
||||
let js = serde_json::to_string(&x).unwrap();
|
||||
assert_eq!(js, r#""temperature""#);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ser_backend_name() {
|
||||
let x = ChannelTuple {
|
||||
backend: Some("beach".into()),
|
||||
name: "temperature".into(),
|
||||
};
|
||||
let js = serde_json::to_string(&x).unwrap();
|
||||
assert_eq!(js, r#""beach/temperature""#);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct Api1Query {
|
||||
range: Api1Range,
|
||||
channels: Vec<ChannelTuple>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
timeout: Option<Duration>,
|
||||
// All following parameters are private and not to be used
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
file_io_buffer_size: Option<FileIoBufferSize>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
decompress: Option<bool>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
events_max: Option<u64>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
io_queue_len: Option<u32>,
|
||||
#[serde(default, skip_serializing_if = "String::is_empty")]
|
||||
log_level: String,
|
||||
#[serde(default, skip_serializing_if = "String::is_empty")]
|
||||
read_sys: String,
|
||||
}
|
||||
|
||||
impl Api1Query {
|
||||
pub fn new(range: Api1Range, channels: Vec<ChannelTuple>) -> Self {
|
||||
Self {
|
||||
range,
|
||||
channels,
|
||||
timeout: None,
|
||||
decompress: None,
|
||||
events_max: None,
|
||||
file_io_buffer_size: None,
|
||||
io_queue_len: None,
|
||||
log_level: String::new(),
|
||||
read_sys: String::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn disk_io_tune(&self) -> DiskIoTune {
|
||||
let mut k = DiskIoTune::default();
|
||||
if let Some(x) = &self.file_io_buffer_size {
|
||||
k.read_buffer_len = x.0;
|
||||
}
|
||||
if let Some(x) = self.io_queue_len {
|
||||
k.read_queue_len = x as usize;
|
||||
}
|
||||
let read_sys: ReadSys = self.read_sys.as_str().into();
|
||||
k.read_sys = read_sys;
|
||||
k
|
||||
}
|
||||
|
||||
pub fn range(&self) -> &Api1Range {
|
||||
&self.range
|
||||
}
|
||||
|
||||
pub fn channels(&self) -> &[ChannelTuple] {
|
||||
&self.channels
|
||||
}
|
||||
|
||||
pub fn timeout(&self) -> Option<Duration> {
|
||||
self.timeout
|
||||
}
|
||||
|
||||
pub fn timeout_or_default(&self) -> Duration {
|
||||
Duration::from_secs(60 * 30)
|
||||
}
|
||||
|
||||
pub fn log_level(&self) -> &str {
|
||||
&self.log_level
|
||||
}
|
||||
|
||||
pub fn decompress(&self) -> Option<bool> {
|
||||
self.decompress
|
||||
}
|
||||
|
||||
pub fn events_max(&self) -> Option<u64> {
|
||||
self.events_max
|
||||
}
|
||||
|
||||
pub fn set_decompress(&mut self, v: Option<bool>) {
|
||||
self.decompress = v;
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_api1_query() -> Result<(), Error> {
|
||||
let beg = Datetime::try_from("2022-11-22T08:09:10Z")?;
|
||||
let end = Datetime::try_from("2022-11-23T08:11:05.455009+02:00")?;
|
||||
let range = Api1Range::new(beg, end).unwrap();
|
||||
let ch0 = ChannelTuple::from_name("nameonly".into());
|
||||
let ch1 = ChannelTuple::new("somebackend".into(), "somechan".into());
|
||||
let qu = Api1Query::new(range, vec![ch0, ch1]);
|
||||
let js = serde_json::to_string(&qu).unwrap();
|
||||
assert_eq!(
|
||||
js,
|
||||
r#"{"range":{"startDate":"2022-11-22T08:09:10Z","endDate":"2022-11-23T08:11:05.455009+02:00"},"channels":["nameonly","somebackend/somechan"]}"#
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,192 +0,0 @@
|
||||
use chrono::DateTime;
|
||||
use chrono::FixedOffset;
|
||||
use daqbuf_err as err;
|
||||
use err::Error;
|
||||
use serde::de::Visitor;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::fmt;
|
||||
use std::ops;
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct Datetime(DateTime<FixedOffset>);
|
||||
|
||||
impl From<DateTime<FixedOffset>> for Datetime {
|
||||
fn from(x: DateTime<FixedOffset>) -> Self {
|
||||
Datetime(x)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for Datetime {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(val: &str) -> Result<Self, Self::Error> {
|
||||
let dt =
|
||||
DateTime::<FixedOffset>::parse_from_rfc3339(val).map_err(|e| Error::with_msg_no_trace(format!("{e}")))?;
|
||||
Ok(Datetime(dt))
|
||||
}
|
||||
}
|
||||
|
||||
impl ops::Deref for Datetime {
|
||||
type Target = DateTime<FixedOffset>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
// RFC 3339 (subset of ISO 8601)
|
||||
|
||||
impl Serialize for Datetime {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use fmt::Write;
|
||||
use serde::ser::Error;
|
||||
let val = &self.0;
|
||||
let mut s = String::with_capacity(64);
|
||||
write!(&mut s, "{}", val.format("%Y-%m-%dT%H:%M:%S")).map_err(|_| Error::custom("fmt"))?;
|
||||
let ns = val.timestamp_subsec_nanos();
|
||||
let mus = val.timestamp_subsec_micros();
|
||||
if ns % 1000 != 0 {
|
||||
write!(&mut s, "{}", val.format(".%9f")).map_err(|_| Error::custom("fmt"))?;
|
||||
} else if mus % 1000 != 0 {
|
||||
write!(&mut s, "{}", val.format(".%6f")).map_err(|_| Error::custom("fmt"))?;
|
||||
} else if mus != 0 {
|
||||
write!(&mut s, "{}", val.format(".%3f")).map_err(|_| Error::custom("fmt"))?;
|
||||
}
|
||||
if val.offset().local_minus_utc() == 0 {
|
||||
write!(&mut s, "Z").map_err(|_| Error::custom("fmt"))?;
|
||||
} else {
|
||||
write!(&mut s, "{}", val.format("%:z")).map_err(|_| Error::custom("fmt"))?;
|
||||
}
|
||||
serializer.collect_str(&s)
|
||||
}
|
||||
}
|
||||
|
||||
mod ser_impl_2 {
|
||||
use super::Datetime;
|
||||
use crate::DATETIME_FMT_0MS;
|
||||
use crate::DATETIME_FMT_3MS;
|
||||
use crate::DATETIME_FMT_6MS;
|
||||
use crate::DATETIME_FMT_9MS;
|
||||
use fmt::Write;
|
||||
use serde::ser::Error;
|
||||
use std::fmt;
|
||||
|
||||
#[allow(unused)]
|
||||
fn serialize<S>(obj: &Datetime, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
let val = &obj.0;
|
||||
let mut s = String::with_capacity(64);
|
||||
write!(&mut s, "{}", val.format("%Y-%m-%dT%H:%M:%S")).map_err(|_| Error::custom("fmt"))?;
|
||||
let ns = val.timestamp_subsec_nanos();
|
||||
let s = if ns % 1000 != 0 {
|
||||
val.format(DATETIME_FMT_9MS)
|
||||
} else {
|
||||
let mus = val.timestamp_subsec_micros();
|
||||
if mus % 1000 != 0 {
|
||||
val.format(DATETIME_FMT_6MS)
|
||||
} else {
|
||||
let ms = val.timestamp_subsec_millis();
|
||||
if ms != 0 {
|
||||
val.format(DATETIME_FMT_3MS)
|
||||
} else {
|
||||
val.format(DATETIME_FMT_0MS)
|
||||
}
|
||||
}
|
||||
};
|
||||
serializer.collect_str(&s)
|
||||
}
|
||||
}
|
||||
|
||||
struct Vis1;
|
||||
|
||||
impl<'de> Visitor<'de> for Vis1 {
|
||||
type Value = Datetime;
|
||||
|
||||
fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(fmt, "Datetime")
|
||||
}
|
||||
|
||||
fn visit_str<E>(self, val: &str) -> Result<Self::Value, E>
|
||||
where
|
||||
E: serde::de::Error,
|
||||
{
|
||||
Datetime::try_from(val).map_err(|e| serde::de::Error::custom(format!("{e}")))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for Datetime {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
deserializer.deserialize_str(Vis1)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ser_00() {
|
||||
use chrono::TimeZone;
|
||||
let x = FixedOffset::east_opt(0)
|
||||
.unwrap()
|
||||
.with_ymd_and_hms(2023, 2, 3, 15, 12, 40)
|
||||
.earliest()
|
||||
.unwrap();
|
||||
let x = Datetime(x);
|
||||
let s = serde_json::to_string(&x).unwrap();
|
||||
|
||||
assert_eq!(s, r#""2023-02-03T15:12:40Z""#);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ser_01() {
|
||||
use chrono::TimeZone;
|
||||
let x = FixedOffset::east_opt(0)
|
||||
.unwrap()
|
||||
.with_ymd_and_hms(2023, 2, 3, 15, 12, 40)
|
||||
.earliest()
|
||||
.unwrap()
|
||||
.checked_add_signed(chrono::Duration::milliseconds(876))
|
||||
.unwrap();
|
||||
let x = Datetime(x);
|
||||
let s = serde_json::to_string(&x).unwrap();
|
||||
|
||||
assert_eq!(s, r#""2023-02-03T15:12:40.876Z""#);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ser_02() {
|
||||
use chrono::TimeZone;
|
||||
let x = FixedOffset::east_opt(0)
|
||||
.unwrap()
|
||||
.with_ymd_and_hms(2023, 2, 3, 15, 12, 40)
|
||||
.earliest()
|
||||
.unwrap()
|
||||
.checked_add_signed(chrono::Duration::nanoseconds(543430000))
|
||||
.unwrap();
|
||||
let x = Datetime(x);
|
||||
let s = serde_json::to_string(&x).unwrap();
|
||||
|
||||
assert_eq!(s, r#""2023-02-03T15:12:40.543430Z""#);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ser_03() {
|
||||
use chrono::TimeZone;
|
||||
let x = FixedOffset::east_opt(0)
|
||||
.unwrap()
|
||||
.with_ymd_and_hms(2023, 2, 3, 15, 12, 40)
|
||||
.earliest()
|
||||
.unwrap()
|
||||
.checked_add_signed(chrono::Duration::nanoseconds(543432321))
|
||||
.unwrap();
|
||||
let x = Datetime(x);
|
||||
let s = serde_json::to_string(&x).unwrap();
|
||||
|
||||
assert_eq!(s, r#""2023-02-03T15:12:40.543432321Z""#);
|
||||
}
|
||||
@@ -1,144 +0,0 @@
|
||||
use super::agg_kind_from_binning_scheme;
|
||||
use super::binning_scheme_append_to_url;
|
||||
use super::CacheUsage;
|
||||
use crate::AggKind;
|
||||
use crate::AppendToUrl;
|
||||
use crate::ByteSize;
|
||||
use crate::FromUrl;
|
||||
use crate::NetpodError;
|
||||
use crate::PreBinnedPatchCoordEnum;
|
||||
use crate::ScalarType;
|
||||
use crate::SfDbChannel;
|
||||
use crate::Shape;
|
||||
use std::collections::BTreeMap;
|
||||
use url::Url;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct PreBinnedQuery {
|
||||
patch: PreBinnedPatchCoordEnum,
|
||||
channel: SfDbChannel,
|
||||
scalar_type: ScalarType,
|
||||
shape: Shape,
|
||||
agg_kind: Option<AggKind>,
|
||||
cache_usage: Option<CacheUsage>,
|
||||
buf_len_disk_io: Option<usize>,
|
||||
disk_stats_every: Option<ByteSize>,
|
||||
}
|
||||
|
||||
impl PreBinnedQuery {
|
||||
pub fn new(
|
||||
patch: PreBinnedPatchCoordEnum,
|
||||
channel: SfDbChannel,
|
||||
scalar_type: ScalarType,
|
||||
shape: Shape,
|
||||
agg_kind: Option<AggKind>,
|
||||
cache_usage: Option<CacheUsage>,
|
||||
buf_len_disk_io: Option<usize>,
|
||||
disk_stats_every: Option<ByteSize>,
|
||||
) -> Self {
|
||||
Self {
|
||||
patch,
|
||||
channel,
|
||||
scalar_type,
|
||||
shape,
|
||||
agg_kind,
|
||||
cache_usage,
|
||||
buf_len_disk_io,
|
||||
disk_stats_every,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_url(url: &Url) -> Result<Self, NetpodError> {
|
||||
let mut pairs = BTreeMap::new();
|
||||
for (j, k) in url.query_pairs() {
|
||||
pairs.insert(j.to_string(), k.to_string());
|
||||
}
|
||||
let pairs = pairs;
|
||||
let scalar_type = pairs
|
||||
.get("scalarType")
|
||||
.ok_or_else(|| NetpodError::MissingScalarType)
|
||||
.map(|x| ScalarType::from_url_str(&x))??;
|
||||
let shape = pairs
|
||||
.get("shape")
|
||||
.ok_or_else(|| NetpodError::MissingShape)
|
||||
.map(|x| Shape::from_url_str(&x))??;
|
||||
let ret = Self {
|
||||
patch: PreBinnedPatchCoordEnum::from_pairs(&pairs)?,
|
||||
channel: SfDbChannel::from_pairs(&pairs)?,
|
||||
scalar_type,
|
||||
shape,
|
||||
agg_kind: agg_kind_from_binning_scheme(&pairs)?,
|
||||
cache_usage: CacheUsage::from_pairs(&pairs)?,
|
||||
buf_len_disk_io: pairs
|
||||
.get("bufLenDiskIo")
|
||||
.map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?,
|
||||
disk_stats_every: pairs
|
||||
.get("diskStatsEveryKb")
|
||||
.map(|k| k.parse().ok())
|
||||
.unwrap_or(None)
|
||||
.map(ByteSize::from_kb),
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub fn patch(&self) -> &PreBinnedPatchCoordEnum {
|
||||
&self.patch
|
||||
}
|
||||
|
||||
pub fn channel(&self) -> &SfDbChannel {
|
||||
&self.channel
|
||||
}
|
||||
|
||||
pub fn scalar_type(&self) -> &ScalarType {
|
||||
&self.scalar_type
|
||||
}
|
||||
|
||||
pub fn shape(&self) -> &Shape {
|
||||
&self.shape
|
||||
}
|
||||
|
||||
pub fn agg_kind(&self) -> &Option<AggKind> {
|
||||
&self.agg_kind
|
||||
}
|
||||
|
||||
pub fn disk_stats_every(&self) -> ByteSize {
|
||||
match &self.disk_stats_every {
|
||||
Some(x) => x.clone(),
|
||||
None => ByteSize(1024 * 1024 * 4),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn cache_usage(&self) -> CacheUsage {
|
||||
self.cache_usage.as_ref().map_or(CacheUsage::Use, |x| x.clone())
|
||||
}
|
||||
|
||||
pub fn buf_len_disk_io(&self) -> usize {
|
||||
self.buf_len_disk_io.unwrap_or(1024 * 8)
|
||||
}
|
||||
}
|
||||
|
||||
impl AppendToUrl for PreBinnedQuery {
|
||||
fn append_to_url(&self, url: &mut Url) {
|
||||
if false {
|
||||
panic!("remove, not in use");
|
||||
}
|
||||
self.patch.append_to_url(url);
|
||||
self.channel.append_to_url(url);
|
||||
self.shape.append_to_url(url);
|
||||
self.scalar_type.append_to_url(url);
|
||||
if let Some(x) = &self.agg_kind {
|
||||
binning_scheme_append_to_url(x, url);
|
||||
}
|
||||
let mut g = url.query_pairs_mut();
|
||||
// TODO add also impl AppendToUrl for these if applicable:
|
||||
if let Some(x) = &self.cache_usage {
|
||||
g.append_pair("cacheUsage", &x.query_param_value());
|
||||
}
|
||||
if let Some(x) = self.buf_len_disk_io {
|
||||
g.append_pair("bufLenDiskIo", &format!("{}", x));
|
||||
}
|
||||
if let Some(x) = &self.disk_stats_every {
|
||||
g.append_pair("diskStatsEveryKb", &format!("{}", x.bytes() / 1024));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,2 +0,0 @@
|
||||
pub mod binrange;
|
||||
pub mod evrange;
|
||||
@@ -1,86 +0,0 @@
|
||||
use super::evrange::NanoRange;
|
||||
use super::evrange::SeriesRange;
|
||||
use crate::timeunits::SEC;
|
||||
use crate::BinnedRangeEnum;
|
||||
use crate::Dim0Kind;
|
||||
use crate::TsNano;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
|
||||
#[test]
|
||||
fn test_binned_range_covering_00() {
|
||||
let range = SeriesRange::TimeRange(NanoRange::from_date_time(
|
||||
DateTime::parse_from_rfc3339("1970-01-01T10:10:00Z").unwrap().into(),
|
||||
DateTime::parse_from_rfc3339("1970-01-01T10:20:00Z").unwrap().into(),
|
||||
));
|
||||
let r = BinnedRangeEnum::covering_range(range, 9).unwrap();
|
||||
assert_eq!(r.bin_count(), 10);
|
||||
if let Dim0Kind::Time = r.dim0kind() {
|
||||
} else {
|
||||
panic!()
|
||||
}
|
||||
let r2 = r.binned_range_time();
|
||||
let a = r2.edges();
|
||||
assert_eq!(a.len(), 1 + r.bin_count() as usize);
|
||||
assert_eq!(a[0], TsNano((((10 * 60) + 10) * 60 + 0) * SEC));
|
||||
assert_eq!(a[1], TsNano((((10 * 60) + 11) * 60 + 0) * SEC));
|
||||
assert_eq!(a[10], TsNano((((10 * 60) + 20) * 60 + 0) * SEC));
|
||||
let x = r.range_at(2).unwrap();
|
||||
let y = SeriesRange::TimeRange(NanoRange {
|
||||
beg: (((10 * 60) + 12) * 60 + 0) * SEC,
|
||||
end: (((10 * 60) + 13) * 60 + 0) * SEC,
|
||||
});
|
||||
assert_eq!(x, y);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_binned_range_covering_01() {
|
||||
let range = SeriesRange::TimeRange(NanoRange::from_date_time(
|
||||
DateTime::parse_from_rfc3339("1970-01-01T00:20:04Z").unwrap().into(),
|
||||
DateTime::parse_from_rfc3339("1970-01-01T00:21:10Z").unwrap().into(),
|
||||
));
|
||||
let r = BinnedRangeEnum::covering_range(range, 9).unwrap();
|
||||
assert_eq!(r.bin_count(), 14);
|
||||
if let Dim0Kind::Time = r.dim0kind() {
|
||||
} else {
|
||||
panic!()
|
||||
}
|
||||
let r2 = r.binned_range_time();
|
||||
let a = r2.edges();
|
||||
assert_eq!(a.len(), 1 + r.bin_count() as usize);
|
||||
assert_eq!(a[0], TsNano((((0 * 60) + 20) * 60 + 0) * SEC));
|
||||
assert_eq!(a[1], TsNano((((0 * 60) + 20) * 60 + 5) * SEC));
|
||||
assert_eq!(a[14], TsNano((((0 * 60) + 21) * 60 + 10) * SEC));
|
||||
let x = r.range_at(0).unwrap();
|
||||
let y = SeriesRange::TimeRange(NanoRange {
|
||||
beg: (((0 * 60) + 20) * 60 + 0) * SEC,
|
||||
end: (((0 * 60) + 20) * 60 + 5) * SEC,
|
||||
});
|
||||
assert_eq!(x, y);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_binned_range_covering_02() {
|
||||
let range = SeriesRange::TimeRange(NanoRange::from_date_time(
|
||||
DateTime::parse_from_rfc3339("1970-01-01T00:20:04Z").unwrap().into(),
|
||||
DateTime::parse_from_rfc3339("1970-01-01T00:22:10Z").unwrap().into(),
|
||||
));
|
||||
let r = BinnedRangeEnum::covering_range(range, 25).unwrap();
|
||||
assert_eq!(r.bin_count(), 26);
|
||||
if let Dim0Kind::Time = r.dim0kind() {
|
||||
} else {
|
||||
panic!()
|
||||
}
|
||||
let r2 = r.binned_range_time();
|
||||
let a = r2.edges();
|
||||
assert_eq!(a.len(), 1 + r.bin_count() as usize);
|
||||
assert_eq!(a[0], TsNano((((0 * 60) + 20) * 60 + 0) * SEC));
|
||||
assert_eq!(a[1], TsNano((((0 * 60) + 20) * 60 + 5) * SEC));
|
||||
assert_eq!(a[14], TsNano((((0 * 60) + 21) * 60 + 10) * SEC));
|
||||
let x = r.range_at(0).unwrap();
|
||||
let y = SeriesRange::TimeRange(NanoRange {
|
||||
beg: (((0 * 60) + 20) * 60 + 0) * SEC,
|
||||
end: (((0 * 60) + 20) * 60 + 5) * SEC,
|
||||
});
|
||||
assert_eq!(x, y);
|
||||
}
|
||||
@@ -1,220 +0,0 @@
|
||||
use crate::query::PulseRangeQuery;
|
||||
use crate::query::TimeRangeQuery;
|
||||
use crate::timeunits::SEC;
|
||||
use crate::AppendToUrl;
|
||||
use crate::Dim0Kind;
|
||||
use crate::FromUrl;
|
||||
use crate::NetpodError;
|
||||
use crate::TsNano;
|
||||
use chrono::DateTime;
|
||||
use chrono::TimeZone;
|
||||
use chrono::Utc;
|
||||
use daqbuf_err as err;
|
||||
use err::Error;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
use url::Url;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum TimeRange {
|
||||
Time { beg: DateTime<Utc>, end: DateTime<Utc> },
|
||||
Pulse { beg: u64, end: u64 },
|
||||
Nano { beg: u64, end: u64 },
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct NanoRange {
|
||||
pub beg: u64,
|
||||
pub end: u64,
|
||||
}
|
||||
|
||||
impl fmt::Debug for NanoRange {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
if true {
|
||||
let beg = TsNano(self.beg);
|
||||
let end = TsNano(self.end);
|
||||
write!(fmt, "NanoRange {{ beg: {}, end: {} }}", beg.fmt(), end.fmt())
|
||||
} else if false {
|
||||
let beg = TsNano(self.beg);
|
||||
let end = TsNano(self.end);
|
||||
fmt.debug_struct("NanoRange")
|
||||
.field("beg", &beg)
|
||||
.field("end", &end)
|
||||
.finish()
|
||||
} else {
|
||||
let beg = chrono::Utc
|
||||
.timestamp_opt((self.beg / SEC) as i64, (self.beg % SEC) as u32)
|
||||
.earliest();
|
||||
let end = chrono::Utc
|
||||
.timestamp_opt((self.end / SEC) as i64, (self.end % SEC) as u32)
|
||||
.earliest();
|
||||
if let (Some(a), Some(b)) = (beg, end) {
|
||||
fmt.debug_struct("NanoRange").field("beg", &a).field("end", &b).finish()
|
||||
} else {
|
||||
fmt.debug_struct("NanoRange")
|
||||
.field("beg", &beg)
|
||||
.field("end", &end)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for NanoRange {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt::Debug::fmt(self, fmt)
|
||||
}
|
||||
}
|
||||
|
||||
impl NanoRange {
|
||||
pub fn from_date_time(beg: DateTime<Utc>, end: DateTime<Utc>) -> Self {
|
||||
Self {
|
||||
beg: beg.timestamp_nanos_opt().unwrap_or(0) as u64,
|
||||
end: end.timestamp_nanos_opt().unwrap_or(0) as u64,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_ns_u64(beg: u64, end: u64) -> Self {
|
||||
Self { beg, end }
|
||||
}
|
||||
|
||||
pub fn delta(&self) -> u64 {
|
||||
self.end - self.beg
|
||||
}
|
||||
|
||||
pub fn beg(&self) -> u64 {
|
||||
self.beg
|
||||
}
|
||||
|
||||
pub fn end(&self) -> u64 {
|
||||
self.end
|
||||
}
|
||||
}
|
||||
|
||||
impl From<(u64, u64)> for NanoRange {
|
||||
fn from(value: (u64, u64)) -> Self {
|
||||
Self {
|
||||
beg: value.0,
|
||||
end: value.1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&SeriesRange> for NanoRange {
|
||||
type Error = NetpodError;
|
||||
|
||||
fn try_from(val: &SeriesRange) -> Result<NanoRange, Self::Error> {
|
||||
match val {
|
||||
SeriesRange::TimeRange(x) => Ok(x.clone()),
|
||||
SeriesRange::PulseRange(_) => Err(NetpodError::NotTimerange),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct PulseRange {
|
||||
pub beg: u64,
|
||||
pub end: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub enum SeriesRange {
|
||||
TimeRange(NanoRange),
|
||||
PulseRange(PulseRange),
|
||||
}
|
||||
|
||||
impl SeriesRange {
|
||||
pub fn dim0kind(&self) -> Dim0Kind {
|
||||
match self {
|
||||
SeriesRange::TimeRange(_) => Dim0Kind::Time,
|
||||
SeriesRange::PulseRange(_) => Dim0Kind::Pulse,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_time(&self) -> bool {
|
||||
match self {
|
||||
SeriesRange::TimeRange(_) => true,
|
||||
SeriesRange::PulseRange(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_pulse(&self) -> bool {
|
||||
match self {
|
||||
SeriesRange::TimeRange(_) => false,
|
||||
SeriesRange::PulseRange(_) => true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn beg_u64(&self) -> u64 {
|
||||
match self {
|
||||
SeriesRange::TimeRange(x) => x.beg,
|
||||
SeriesRange::PulseRange(x) => x.beg,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn end_u64(&self) -> u64 {
|
||||
match self {
|
||||
SeriesRange::TimeRange(x) => x.end,
|
||||
SeriesRange::PulseRange(x) => x.end,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn delta_u64(&self) -> u64 {
|
||||
match self {
|
||||
SeriesRange::TimeRange(x) => x.end - x.beg,
|
||||
SeriesRange::PulseRange(x) => x.end - x.beg,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for SeriesRange {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
SeriesRange::TimeRange(range) => write!(fmt, "SeriesRange::TimeRange {{ {} }}", range),
|
||||
SeriesRange::PulseRange(_) => write!(fmt, "SeriesRange::PulseRange {{ .. }}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<NanoRange> for SeriesRange {
|
||||
fn from(k: NanoRange) -> Self {
|
||||
Self::TimeRange(k)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PulseRange> for SeriesRange {
|
||||
fn from(k: PulseRange) -> Self {
|
||||
Self::PulseRange(k)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromUrl for SeriesRange {
|
||||
type Error = NetpodError;
|
||||
|
||||
fn from_url(url: &url::Url) -> Result<Self, Self::Error> {
|
||||
let pairs = crate::get_url_query_pairs(url);
|
||||
Self::from_pairs(&pairs)
|
||||
}
|
||||
|
||||
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Self::Error> {
|
||||
let ret = if let Ok(x) = TimeRangeQuery::from_pairs(pairs) {
|
||||
SeriesRange::TimeRange(x.into())
|
||||
} else if let Ok(x) = PulseRangeQuery::from_pairs(pairs) {
|
||||
SeriesRange::PulseRange(x.into())
|
||||
} else {
|
||||
return Err(NetpodError::MissingTimerange);
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
impl AppendToUrl for SeriesRange {
|
||||
fn append_to_url(&self, url: &mut Url) {
|
||||
match self {
|
||||
SeriesRange::TimeRange(k) => TimeRangeQuery::from(k).append_to_url(url),
|
||||
SeriesRange::PulseRange(k) => PulseRangeQuery::from(k).append_to_url(url),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,4 +0,0 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct SystemStats {}
|
||||
@@ -1,43 +0,0 @@
|
||||
use crate::log::*;
|
||||
|
||||
pub struct StreamImplTracer {
|
||||
name: String,
|
||||
npoll_cnt: usize,
|
||||
npoll_max: usize,
|
||||
loop_cnt: usize,
|
||||
loop_max: usize,
|
||||
}
|
||||
|
||||
impl StreamImplTracer {
|
||||
pub fn new(name: String, npoll_max: usize, loop_max: usize) -> Self {
|
||||
Self {
|
||||
name,
|
||||
npoll_cnt: 0,
|
||||
npoll_max,
|
||||
loop_cnt: 0,
|
||||
loop_max,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn poll_enter(&mut self) -> bool {
|
||||
self.npoll_cnt += 1;
|
||||
if self.npoll_cnt >= self.npoll_max {
|
||||
trace!("{} poll {} reached limit", self.name, self.npoll_cnt);
|
||||
true
|
||||
} else {
|
||||
trace!("{} poll {}", self.name, self.npoll_cnt);
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn loop_enter(&mut self) -> bool {
|
||||
self.loop_cnt += 1;
|
||||
if self.loop_cnt >= self.loop_max {
|
||||
trace!("{} loop {} reached limit", self.name, self.loop_cnt);
|
||||
true
|
||||
} else {
|
||||
trace!("{} loop {}", self.name, self.loop_cnt);
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,72 +0,0 @@
|
||||
use daqbuf_err as err;
|
||||
use err::Error;
|
||||
use futures_util::{Stream, StreamExt};
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
pub struct SCC<S>
|
||||
where
|
||||
S: Stream,
|
||||
{
|
||||
inp: S,
|
||||
errored: bool,
|
||||
completed: bool,
|
||||
}
|
||||
|
||||
impl<S> SCC<S>
|
||||
where
|
||||
S: Stream,
|
||||
{
|
||||
pub fn new(inp: S) -> Self {
|
||||
Self {
|
||||
inp,
|
||||
errored: false,
|
||||
completed: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, I> Stream for SCC<S>
|
||||
where
|
||||
S: Stream<Item = Result<I, Error>> + Unpin,
|
||||
{
|
||||
type Item = <S as Stream>::Item;
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
if self.completed {
|
||||
panic!("SCC poll_next on completed");
|
||||
} else if self.errored {
|
||||
self.completed = true;
|
||||
Ready(None)
|
||||
} else {
|
||||
match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(k))) => Ready(Some(Ok(k))),
|
||||
Ready(Some(Err(e))) => {
|
||||
self.errored = true;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
Ready(None) => {
|
||||
self.completed = true;
|
||||
Ready(None)
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait IntoSCC<S>
|
||||
where
|
||||
S: Stream,
|
||||
{
|
||||
fn into_scc(self) -> SCC<S>;
|
||||
}
|
||||
|
||||
impl<S> IntoSCC<S> for S
|
||||
where
|
||||
S: Stream,
|
||||
{
|
||||
fn into_scc(self) -> SCC<S> {
|
||||
SCC::new(self)
|
||||
}
|
||||
}
|
||||
@@ -1,111 +0,0 @@
|
||||
use core::fmt;
|
||||
use daqbuf_err as err;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum RetentionTime {
|
||||
Short,
|
||||
Medium,
|
||||
Long,
|
||||
}
|
||||
|
||||
impl RetentionTime {
|
||||
pub fn debug_tag(&self) -> &'static str {
|
||||
use RetentionTime::*;
|
||||
match self {
|
||||
Short => "ST",
|
||||
Medium => "MT",
|
||||
Long => "LT",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn table_prefix(&self) -> &'static str {
|
||||
use RetentionTime::*;
|
||||
match self {
|
||||
Short => "st_",
|
||||
Medium => "mt_",
|
||||
Long => "lt_",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ttl_events_d0(&self) -> Duration {
|
||||
let day = 60 * 60 * 24;
|
||||
let margin_max = Duration::from_secs(day * 2);
|
||||
let ttl = self.ttl_ts_msp();
|
||||
let margin = ttl / 10;
|
||||
let margin = if margin >= margin_max { margin_max } else { margin };
|
||||
ttl + margin
|
||||
}
|
||||
|
||||
pub fn ttl_events_d1(&self) -> Duration {
|
||||
// TTL now depends only on RetentionTime, not on data type or shape.
|
||||
self.ttl_events_d0()
|
||||
}
|
||||
|
||||
pub fn ttl_ts_msp(&self) -> Duration {
|
||||
let day = 60 * 60 * 24;
|
||||
match self {
|
||||
RetentionTime::Short => Duration::from_secs(day * 7),
|
||||
RetentionTime::Medium => Duration::from_secs(day * 31 * 13),
|
||||
RetentionTime::Long => Duration::from_secs(day * 31 * 12 * 17),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ttl_binned(&self) -> Duration {
|
||||
// Current choice is to keep the TTL the same as for events
|
||||
self.ttl_events_d0()
|
||||
}
|
||||
|
||||
pub fn ttl_channel_status(&self) -> Duration {
|
||||
// Current choice is to keep the TTL the same as for events
|
||||
self.ttl_events_d0()
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for RetentionTime {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
let s = match self {
|
||||
RetentionTime::Short => "short",
|
||||
RetentionTime::Medium => "medium",
|
||||
RetentionTime::Long => "long",
|
||||
};
|
||||
fmt.write_str(s)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[cstm(name = "TTL")]
|
||||
pub enum Error {
|
||||
Parse,
|
||||
}
|
||||
|
||||
// err::err_dbg_dis!(Error, "ttl::Error::");
|
||||
|
||||
impl FromStr for RetentionTime {
|
||||
type Err = Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let ret = match s {
|
||||
"short" => Self::Short,
|
||||
"medium" => Self::Medium,
|
||||
"long" => Self::Long,
|
||||
_ => return Err(Error::Parse),
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
// impl ToString for RetentionTime {
|
||||
// fn to_string(&self) -> String {
|
||||
// match self {
|
||||
// RetentionTime::Short => "short".into(),
|
||||
// RetentionTime::Medium => "medium".into(),
|
||||
// RetentionTime::Long => "long".into(),
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
@@ -18,7 +18,7 @@ futures-util = "0.3.14"
|
||||
tracing = "0.1.25"
|
||||
hex = "0.4.3"
|
||||
daqbuf-err = { path = "../../../daqbuf-err" }
|
||||
netpod = { path = "../netpod" }
|
||||
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
|
||||
query = { path = "../query" }
|
||||
disk = { path = "../disk" }
|
||||
#parse = { path = "../parse" }
|
||||
|
||||
@@ -14,4 +14,4 @@ byteorder = "1.4"
|
||||
hex = "0.4.3"
|
||||
nom = "7.1.3"
|
||||
daqbuf-err = { path = "../../../daqbuf-err" }
|
||||
netpod = { path = "../netpod" }
|
||||
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
|
||||
|
||||
@@ -13,7 +13,7 @@ url = "2.2"
|
||||
humantime = "2.1.0"
|
||||
humantime-serde = "1.1.1"
|
||||
thiserror = "0.0.1"
|
||||
netpod = { path = "../netpod" }
|
||||
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
|
||||
items_0 = { path = "../items_0" }
|
||||
items_2 = { path = "../items_2" }
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ pin-project = "1"
|
||||
async-channel = "2.3.1"
|
||||
scylla = "0.13.0"
|
||||
daqbuf-err = { path = "../../../daqbuf-err" }
|
||||
netpod = { path = "../netpod" }
|
||||
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
|
||||
query = { path = "../query" }
|
||||
items_0 = { path = "../items_0" }
|
||||
items_2 = { path = "../items_2" }
|
||||
|
||||
@@ -23,7 +23,7 @@ rand_xoshiro = "0.6.0"
|
||||
thiserror = "0.0.1"
|
||||
chrono = { version = "0.4.19", features = ["serde"] }
|
||||
wasmer = { version = "4.1.0", default-features = false, features = ["sys", "cranelift"], optional = true }
|
||||
netpod = { path = "../netpod" }
|
||||
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
|
||||
query = { path = "../query" }
|
||||
items_0 = { path = "../items_0" }
|
||||
items_2 = { path = "../items_2" }
|
||||
|
||||
@@ -21,7 +21,7 @@ rand_xoshiro = "0.6.0"
|
||||
thiserror = "0.0.1"
|
||||
chrono = { version = "0.4.19", features = ["serde"] }
|
||||
wasmer = { version = "4.1.0", default-features = false, features = ["sys", "cranelift"], optional = true }
|
||||
netpod = { path = "../netpod" }
|
||||
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
|
||||
query = { path = "../query" }
|
||||
items_0 = { path = "../items_0" }
|
||||
items_2 = { path = "../items_2" }
|
||||
|
||||
Reference in New Issue
Block a user