Format defaults
This commit is contained in:
@@ -6,7 +6,10 @@ pub struct HistoLog2 {
|
||||
|
||||
impl HistoLog2 {
|
||||
pub fn new(sub: usize) -> Self {
|
||||
Self { histo: [0; 16], sub }
|
||||
Self {
|
||||
histo: [0; 16],
|
||||
sub,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
||||
218
src/netpod.rs
218
src/netpod.rs
@@ -55,7 +55,7 @@ pub mod log {
|
||||
pub use tracing::{debug, error, info, trace, warn};
|
||||
}
|
||||
|
||||
pub mod log_ {
|
||||
pub mod log_direct {
|
||||
pub use crate::{debug, error, info, trace, warn};
|
||||
pub use tracing::{self, event, span, Level};
|
||||
}
|
||||
@@ -651,24 +651,26 @@ mod string_fix_impl_serde {
|
||||
use std::fmt;
|
||||
|
||||
impl<const N: usize> Serialize for StringFix<N> {
|
||||
fn serialize<S>(&self, ser: S) -> Result<S::Ok, S::Error>
|
||||
fn serialize<S>(&self, _ser: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
ser.serialize_str(todo!("StringFix Serialize"))
|
||||
// ser.serialize_str()
|
||||
todo!("StringFix Serialize")
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de, const N: usize> Deserialize<'de> for StringFix<N> {
|
||||
fn deserialize<D>(de: D) -> Result<Self, D::Error>
|
||||
fn deserialize<D>(_de: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
todo!("StringFix Deserialize")
|
||||
// de.deserialize_unit(Vis::<N>)
|
||||
todo!("StringFix Deserialize")
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
struct Vis<const N: usize>;
|
||||
|
||||
impl<'de, const N: usize> Visitor<'de> for Vis<N> {
|
||||
@@ -688,21 +690,43 @@ mod string_fix_impl_serde {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialOrd, PartialEq)]
|
||||
#[derive(Debug, Clone, PartialOrd, PartialEq, Serialize, Deserialize)]
|
||||
pub struct EnumVariant {
|
||||
ix: u16,
|
||||
name: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialOrd, PartialEq)]
|
||||
pub struct EnumVariantRef<'a> {
|
||||
pub ix: u16,
|
||||
pub name: &'a str,
|
||||
}
|
||||
|
||||
impl<'a> From<EnumVariantRef<'a>> for EnumVariant {
|
||||
fn from(value: EnumVariantRef<'a>) -> Self {
|
||||
Self {
|
||||
ix: value.ix,
|
||||
name: value.name.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl EnumVariant {
|
||||
pub fn new(ix: u16, name: impl Into<String>) -> Self {
|
||||
Self { ix, name: name.into() }
|
||||
Self {
|
||||
ix,
|
||||
name: name.into(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ix(&self) -> u16 {
|
||||
self.ix
|
||||
}
|
||||
|
||||
pub fn name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
pub fn name_string(&self) -> String {
|
||||
self.name.clone()
|
||||
}
|
||||
@@ -811,7 +835,10 @@ mod serde_port {
|
||||
E: serde::de::Error,
|
||||
{
|
||||
match val.parse::<u16>() {
|
||||
Err(_) => Err(serde::de::Error::invalid_type(serde::de::Unexpected::Str(val), &self)),
|
||||
Err(_) => Err(serde::de::Error::invalid_type(
|
||||
serde::de::Unexpected::Str(val),
|
||||
&self,
|
||||
)),
|
||||
Ok(v) => Ok(v),
|
||||
}
|
||||
}
|
||||
@@ -864,7 +891,9 @@ impl Node {
|
||||
// TODO should be able to decide whether we are reachable via tls.
|
||||
// So far this does not matter because this `baseurl` is used for internal communication
|
||||
// and is always non-tls.
|
||||
format!("http://{}:{}", self.host, self.port).parse().unwrap()
|
||||
format!("http://{}:{}", self.host, self.port)
|
||||
.parse()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub fn listen(&self) -> String {
|
||||
@@ -922,7 +951,9 @@ impl Cluster {
|
||||
}
|
||||
|
||||
pub fn scylla_st(&self) -> Option<&ScyllaConfig> {
|
||||
self.scylla_st.as_ref().map_or_else(|| self.scylla.as_ref(), Some)
|
||||
self.scylla_st
|
||||
.as_ref()
|
||||
.map_or_else(|| self.scylla.as_ref(), Some)
|
||||
}
|
||||
|
||||
pub fn scylla_mt(&self) -> Option<&ScyllaConfig> {
|
||||
@@ -1142,7 +1173,10 @@ impl FromUrl for SfDbChannel {
|
||||
|
||||
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Self::Error> {
|
||||
let ret = SfDbChannel {
|
||||
backend: pairs.get("backend").ok_or_else(|| NetpodError::MissingBackend)?.into(),
|
||||
backend: pairs
|
||||
.get("backend")
|
||||
.ok_or_else(|| NetpodError::MissingBackend)?
|
||||
.into(),
|
||||
name: pairs
|
||||
.get("channelName")
|
||||
.map(String::from)
|
||||
@@ -1230,7 +1264,10 @@ impl FromUrl for DaqbufSeries {
|
||||
.get("seriesId")
|
||||
.ok_or_else(|| NetpodError::MissingSeries)
|
||||
.map(|x| x.parse::<u64>())??,
|
||||
backend: pairs.get("backend").ok_or_else(|| NetpodError::MissingBackend)?.into(),
|
||||
backend: pairs
|
||||
.get("backend")
|
||||
.ok_or_else(|| NetpodError::MissingBackend)?
|
||||
.into(),
|
||||
name: pairs
|
||||
.get("channelName")
|
||||
.map(String::from)
|
||||
@@ -1453,7 +1490,11 @@ impl Shape {
|
||||
0 => Shape::Scalar,
|
||||
1 => Shape::Wave(a[0]),
|
||||
2 => Shape::Image(a[0], a[1]),
|
||||
_ => return Err(Error::with_msg_no_trace("can not understand sf databuffer shape spec")),
|
||||
_ => {
|
||||
return Err(Error::with_msg_no_trace(
|
||||
"can not understand sf databuffer shape spec",
|
||||
))
|
||||
}
|
||||
},
|
||||
None => Shape::Scalar,
|
||||
};
|
||||
@@ -1505,7 +1546,10 @@ impl Shape {
|
||||
}
|
||||
JsVal::Object(j) => match j.get("Wave") {
|
||||
Some(JsVal::Number(j)) => Ok(Shape::Wave(j.as_u64().ok_or_else(|| {
|
||||
Error::with_msg_no_trace(format!("Shape from_db_jsval can not understand {:?}", v))
|
||||
Error::with_msg_no_trace(format!(
|
||||
"Shape from_db_jsval can not understand {:?}",
|
||||
v
|
||||
))
|
||||
})? as u32)),
|
||||
_ => Err(Error::with_msg_no_trace(format!(
|
||||
"Shape from_db_jsval can not understand {:?}",
|
||||
@@ -1528,7 +1572,9 @@ impl Shape {
|
||||
} else if a.len() == 2 {
|
||||
Ok(Shape::Image(a[0], a[1]))
|
||||
} else {
|
||||
Err(Error::with_public_msg_no_trace("only scalar, 1d and 2d supported"))
|
||||
Err(Error::with_public_msg_no_trace(
|
||||
"only scalar, 1d and 2d supported",
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1540,7 +1586,9 @@ impl Shape {
|
||||
} else if v.len() == 2 {
|
||||
Shape::Image(v[0] as u32, v[1] as u32)
|
||||
} else {
|
||||
return Err(Error::with_public_msg_no_trace(format!("bad shape_dims {v:?}")));
|
||||
return Err(Error::with_public_msg_no_trace(format!(
|
||||
"bad shape_dims {v:?}"
|
||||
)));
|
||||
};
|
||||
Ok(res)
|
||||
}
|
||||
@@ -1598,7 +1646,10 @@ impl Shape {
|
||||
match self {
|
||||
Shape::Scalar => JsVal::Array(Vec::new()),
|
||||
Shape::Wave(n) => JsVal::Array(vec![JsVal::Number(Number::from(*n))]),
|
||||
Shape::Image(n, m) => JsVal::Array(vec![JsVal::Number(Number::from(*n)), JsVal::Number(Number::from(*m))]),
|
||||
Shape::Image(n, m) => JsVal::Array(vec![
|
||||
JsVal::Number(Number::from(*n)),
|
||||
JsVal::Number(Number::from(*m)),
|
||||
]),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2187,8 +2238,8 @@ pub fn time_bin_len_cache_opts() -> &'static [DtMs] {
|
||||
}
|
||||
|
||||
const PULSE_BIN_THRESHOLDS: [u64; 25] = [
|
||||
10, 20, 40, 80, 100, 200, 400, 800, 1000, 2000, 4000, 8000, 10000, 20000, 40000, 80000, 100000, 200000, 400000,
|
||||
800000, 1000000, 2000000, 4000000, 8000000, 10000000,
|
||||
10, 20, 40, 80, 100, 200, 400, 800, 1000, 2000, 4000, 8000, 10000, 20000, 40000, 80000, 100000,
|
||||
200000, 400000, 800000, 1000000, 2000000, 4000000, 8000000, 10000000,
|
||||
];
|
||||
|
||||
#[allow(unused)]
|
||||
@@ -2234,11 +2285,15 @@ where
|
||||
}
|
||||
|
||||
pub fn patch_beg(&self) -> T {
|
||||
self.bin_len().times(self.bin_count).times(self.patch_offset)
|
||||
self.bin_len()
|
||||
.times(self.bin_count)
|
||||
.times(self.patch_offset)
|
||||
}
|
||||
|
||||
pub fn patch_end(&self) -> T {
|
||||
self.bin_len().times(self.bin_count).times(1 + self.patch_offset)
|
||||
self.bin_len()
|
||||
.times(self.bin_count)
|
||||
.times(1 + self.patch_offset)
|
||||
}
|
||||
|
||||
pub fn series_range(&self) -> SeriesRange {
|
||||
@@ -2303,10 +2358,18 @@ impl PreBinnedPatchCoordEnum {
|
||||
pub fn span_desc(&self) -> String {
|
||||
match self {
|
||||
PreBinnedPatchCoordEnum::Time(k) => {
|
||||
format!("pre-W-{}-B-{}", k.bin_len.0 * k.bin_count / SEC, k.patch_offset / SEC)
|
||||
format!(
|
||||
"pre-W-{}-B-{}",
|
||||
k.bin_len.0 * k.bin_count / SEC,
|
||||
k.patch_offset / SEC
|
||||
)
|
||||
}
|
||||
PreBinnedPatchCoordEnum::Pulse(k) => {
|
||||
format!("pre-W-{}-B-{}", k.bin_len.0 * k.bin_count / SEC, k.patch_offset / SEC)
|
||||
format!(
|
||||
"pre-W-{}-B-{}",
|
||||
k.bin_len.0 * k.bin_count / SEC,
|
||||
k.patch_offset / SEC
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2378,7 +2441,10 @@ impl PreBinnedPatchRangeEnum {
|
||||
Err(Error::with_msg("min_bin_count < 1"))?;
|
||||
}
|
||||
if min_bin_count > 20000 {
|
||||
Err(Error::with_msg(format!("min_bin_count > 20000: {}", min_bin_count)))?;
|
||||
Err(Error::with_msg(format!(
|
||||
"min_bin_count > 20000: {}",
|
||||
min_bin_count
|
||||
)))?;
|
||||
}
|
||||
let du = b.sub(&a);
|
||||
let max_bin_len = du.div_n(min_bin_count as u64);
|
||||
@@ -2389,18 +2455,25 @@ impl PreBinnedPatchRangeEnum {
|
||||
let patch_off_1 = a.div_v(&patch_len);
|
||||
let patch_off_2 = (b.add(&patch_len).sub_n(1)).div_v(&patch_len);
|
||||
let patch_count = patch_off_2 - patch_off_1;
|
||||
let ret = T::to_pre_binned_patch_range_enum(&bl, bin_count, patch_off_1, patch_count);
|
||||
let ret =
|
||||
T::to_pre_binned_patch_range_enum(&bl, bin_count, patch_off_1, patch_count);
|
||||
return Ok(ret);
|
||||
}
|
||||
}
|
||||
Err(Error::with_msg_no_trace("can not find matching pre-binned grid"))
|
||||
Err(Error::with_msg_no_trace(
|
||||
"can not find matching pre-binned grid",
|
||||
))
|
||||
}
|
||||
|
||||
/// Cover at least the given range with at least as many as the requested number of bins.
|
||||
pub fn covering_range(range: SeriesRange, min_bin_count: u32) -> Result<Self, Error> {
|
||||
match range {
|
||||
SeriesRange::TimeRange(k) => Self::covering_range_ty(TsNano(k.beg), TsNano(k.end), min_bin_count),
|
||||
SeriesRange::PulseRange(k) => Self::covering_range_ty(PulseId(k.beg), PulseId(k.end), min_bin_count),
|
||||
SeriesRange::TimeRange(k) => {
|
||||
Self::covering_range_ty(TsNano(k.beg), TsNano(k.end), min_bin_count)
|
||||
}
|
||||
SeriesRange::PulseRange(k) => {
|
||||
Self::covering_range_ty(PulseId(k.beg), PulseId(k.end), min_bin_count)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2420,7 +2493,13 @@ impl fmt::Debug for BinnedRange<TsNano> {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
let beg = self.bin_len.times(self.bin_off);
|
||||
let end = self.bin_len.times(self.bin_off + self.bin_cnt);
|
||||
write!(fmt, "BinnedRange {{ {}, {}, {} }}", beg, end, self.bin_len.to_dt_ms())
|
||||
write!(
|
||||
fmt,
|
||||
"BinnedRange {{ {}, {}, {} }}",
|
||||
beg,
|
||||
end,
|
||||
self.bin_len.to_dt_ms()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2518,6 +2597,10 @@ impl BinnedRange<TsNano> {
|
||||
pub fn bin_len_dt_ms(&self) -> DtMs {
|
||||
self.bin_len.to_dt_ms()
|
||||
}
|
||||
|
||||
pub fn bin_len_dt_ns(&self) -> DtNano {
|
||||
self.bin_len.to_dt_nano()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> BinnedRange<T>
|
||||
@@ -2528,23 +2611,9 @@ where
|
||||
self.bin_cnt
|
||||
}
|
||||
|
||||
pub fn get_range(&self, ix: u32) -> NanoRange {
|
||||
let _ = ix;
|
||||
/*NanoRange {
|
||||
beg: (self.offset + ix as u64) * self.grid_spec.bin_t_len,
|
||||
end: (self.offset + ix as u64 + 1) * self.grid_spec.bin_t_len,
|
||||
}*/
|
||||
err::todoval()
|
||||
}
|
||||
|
||||
pub fn full_range(&self) -> NanoRange {
|
||||
/*NanoRange {
|
||||
beg: self.offset * self.grid_spec.bin_t_len,
|
||||
end: (self.offset + self.bin_count) * self.grid_spec.bin_t_len,
|
||||
}*/
|
||||
let beg = self.bin_len.times(self.bin_off).as_u64();
|
||||
let end = self.bin_len.times(self.bin_off + self.bin_cnt).as_u64();
|
||||
panic!("TODO make generic for pulse");
|
||||
NanoRange { beg, end }
|
||||
}
|
||||
|
||||
@@ -2608,7 +2677,10 @@ impl BinnedRangeEnum {
|
||||
/// Cover at least the given range while selecting the bin width which best fits the requested bin width.
|
||||
pub fn covering_range_time(range: SeriesRange, bin_len_req: DtMs) -> Result<Self, NetpodError> {
|
||||
match range {
|
||||
SeriesRange::TimeRange(k) => Ok(Self::Time(BinnedRange::covering_range_time(k, bin_len_req)?)),
|
||||
SeriesRange::TimeRange(k) => Ok(Self::Time(BinnedRange::covering_range_time(
|
||||
k,
|
||||
bin_len_req,
|
||||
)?)),
|
||||
SeriesRange::PulseRange(_) => Err(NetpodError::TimelikeBinWidthImpossibleForPulseRange),
|
||||
}
|
||||
}
|
||||
@@ -2616,8 +2688,12 @@ impl BinnedRangeEnum {
|
||||
/// Cover at least the given range with at least as many as the requested number of bins.
|
||||
pub fn covering_range(range: SeriesRange, min_bin_count: u32) -> Result<Self, NetpodError> {
|
||||
match range {
|
||||
SeriesRange::TimeRange(k) => Self::covering_range_ty(TsNano(k.beg), TsNano(k.end), min_bin_count),
|
||||
SeriesRange::PulseRange(k) => Self::covering_range_ty(PulseId(k.beg), PulseId(k.end), min_bin_count),
|
||||
SeriesRange::TimeRange(k) => {
|
||||
Self::covering_range_ty(TsNano(k.beg), TsNano(k.end), min_bin_count)
|
||||
}
|
||||
SeriesRange::PulseRange(k) => {
|
||||
Self::covering_range_ty(PulseId(k.beg), PulseId(k.end), min_bin_count)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3276,9 +3352,16 @@ impl ChannelSearchQuery {
|
||||
let ret = Self {
|
||||
backend: pairs.get("backend").map(Into::into),
|
||||
name_regex: pairs.get("nameRegex").map_or(String::new(), |k| k.clone()),
|
||||
source_regex: pairs.get("sourceRegex").map_or(String::new(), |k| k.clone()),
|
||||
description_regex: pairs.get("descriptionRegex").map_or(String::new(), |k| k.clone()),
|
||||
icase: pairs.get("icase").map_or(None, |x| x.parse().ok()).unwrap_or(false),
|
||||
source_regex: pairs
|
||||
.get("sourceRegex")
|
||||
.map_or(String::new(), |k| k.clone()),
|
||||
description_regex: pairs
|
||||
.get("descriptionRegex")
|
||||
.map_or(String::new(), |k| k.clone()),
|
||||
icase: pairs
|
||||
.get("icase")
|
||||
.map_or(None, |x| x.parse().ok())
|
||||
.unwrap_or(false),
|
||||
kind: SeriesKind::from_pairs(&pairs)?,
|
||||
};
|
||||
Ok(ret)
|
||||
@@ -3418,7 +3501,10 @@ impl HasTimeout for MapQuery {
|
||||
}
|
||||
|
||||
pub fn get_url_query_pairs(url: &Url) -> BTreeMap<String, String> {
|
||||
BTreeMap::from_iter(url.query_pairs().map(|(j, k)| (j.to_string(), k.to_string())))
|
||||
BTreeMap::from_iter(
|
||||
url.query_pairs()
|
||||
.map(|(j, k)| (j.to_string(), k.to_string())),
|
||||
)
|
||||
}
|
||||
|
||||
// Request type of the channel/config api.
|
||||
@@ -3737,7 +3823,9 @@ impl ChannelTypeConfigGen {
|
||||
if let ChannelTypeConfigGen::Scylla(k) = self {
|
||||
Ok(k.clone())
|
||||
} else {
|
||||
Err(Error::with_msg_no_trace("this ChannelTypeConfigGen is not for scylla"))
|
||||
Err(Error::with_msg_no_trace(
|
||||
"this ChannelTypeConfigGen is not for scylla",
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3745,7 +3833,9 @@ impl ChannelTypeConfigGen {
|
||||
if let ChannelTypeConfigGen::SfDatabuffer(k) = self {
|
||||
Ok(k.clone())
|
||||
} else {
|
||||
Err(Error::with_msg_no_trace("this ChannelTypeConfigGen is not for scylla"))
|
||||
Err(Error::with_msg_no_trace(
|
||||
"this ChannelTypeConfigGen is not for scylla",
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3779,8 +3869,8 @@ impl ChannelTypeConfigGen {
|
||||
|
||||
pub fn series(&self) -> Option<u64> {
|
||||
match self {
|
||||
ChannelTypeConfigGen::Scylla(ch_conf) => Some(ch_conf.series()),
|
||||
ChannelTypeConfigGen::SfDatabuffer(sf_ch_fetch_info) => None,
|
||||
ChannelTypeConfigGen::Scylla(x) => Some(x.series()),
|
||||
ChannelTypeConfigGen::SfDatabuffer(_) => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3931,7 +4021,9 @@ pub fn archapp_test_cluster() -> Cluster {
|
||||
|
||||
pub fn test_data_base_path_databuffer() -> PathBuf {
|
||||
let homedir = std::env::var("HOME").unwrap();
|
||||
let data_base_path = PathBuf::from(homedir).join("daqbuffer-testdata").join("databuffer");
|
||||
let data_base_path = PathBuf::from(homedir)
|
||||
.join("daqbuffer-testdata")
|
||||
.join("databuffer");
|
||||
data_base_path
|
||||
}
|
||||
|
||||
@@ -4125,9 +4217,12 @@ mod instant_serde {
|
||||
pub fn ser<S: Serializer>(x: &SystemTime, ser: S) -> Result<S::Ok, S::Error> {
|
||||
use chrono::LocalResult;
|
||||
let dur = x.duration_since(std::time::UNIX_EPOCH).unwrap();
|
||||
let res = chrono::TimeZone::timestamp_opt(&chrono::Utc, dur.as_secs() as i64, dur.subsec_nanos());
|
||||
let res =
|
||||
chrono::TimeZone::timestamp_opt(&chrono::Utc, dur.as_secs() as i64, dur.subsec_nanos());
|
||||
match res {
|
||||
LocalResult::None => Err(serde::ser::Error::custom(format!("Bad local instant conversion"))),
|
||||
LocalResult::None => Err(serde::ser::Error::custom(format!(
|
||||
"Bad local instant conversion"
|
||||
))),
|
||||
LocalResult::Single(dt) => {
|
||||
let s = dt.format(DATETIME_FMT_3MS).to_string();
|
||||
ser.serialize_str(&s)
|
||||
@@ -4228,7 +4323,10 @@ impl StatusBoard {
|
||||
let tss = tss;
|
||||
let tsm = tss[tss.len() / 3];
|
||||
let a = std::mem::replace(&mut self.entries, BTreeMap::new());
|
||||
self.entries = a.into_iter().filter(|(_k, v)| v.ts_updated >= tsm).collect();
|
||||
self.entries = a
|
||||
.into_iter()
|
||||
.filter(|(_k, v)| v.ts_updated >= tsm)
|
||||
.collect();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4326,6 +4424,8 @@ pub fn req_uri_to_url(uri: &Uri) -> Result<Url, UriError> {
|
||||
.parse()
|
||||
.map_err(|_| UriError::ParseError(uri.clone()))
|
||||
} else {
|
||||
uri.to_string().parse().map_err(|_| UriError::ParseError(uri.clone()))
|
||||
uri.to_string()
|
||||
.parse()
|
||||
.map_err(|_| UriError::ParseError(uri.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
38
src/query.rs
38
src/query.rs
@@ -77,7 +77,10 @@ impl CacheUsage {
|
||||
} else if s == "v0nocache" {
|
||||
CacheUsage::V0NoCache
|
||||
} else {
|
||||
return Err(Error::with_msg(format!("can not interpret cache usage string: {}", s)));
|
||||
return Err(Error::with_msg(format!(
|
||||
"can not interpret cache usage string: {}",
|
||||
s
|
||||
)));
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
@@ -117,7 +120,8 @@ fn parse_time(v: &str) -> Result<DateTime<Utc>, NetpodError> {
|
||||
Ok(x)
|
||||
} else {
|
||||
if v.ends_with("ago") {
|
||||
let d = humantime::parse_duration(&v[..v.len() - 3]).map_err(|_| NetpodError::BadTimerange)?;
|
||||
let d = humantime::parse_duration(&v[..v.len() - 3])
|
||||
.map_err(|_| NetpodError::BadTimerange)?;
|
||||
Ok(Utc::now() - d)
|
||||
} else {
|
||||
Err(NetpodError::BadTimerange)
|
||||
@@ -162,11 +166,15 @@ impl AppendToUrl for TimeRangeQuery {
|
||||
let mut g = url.query_pairs_mut();
|
||||
g.append_pair(
|
||||
"begDate",
|
||||
&Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(),
|
||||
&Utc.timestamp_nanos(self.range.beg as i64)
|
||||
.format(date_fmt)
|
||||
.to_string(),
|
||||
);
|
||||
g.append_pair(
|
||||
"endDate",
|
||||
&Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(),
|
||||
&Utc.timestamp_nanos(self.range.end as i64)
|
||||
.format(date_fmt)
|
||||
.to_string(),
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -183,7 +191,10 @@ impl From<TimeRangeQuery> for NanoRange {
|
||||
impl From<&NanoRange> for TimeRangeQuery {
|
||||
fn from(k: &NanoRange) -> Self {
|
||||
Self {
|
||||
range: NanoRange { beg: k.beg, end: k.end },
|
||||
range: NanoRange {
|
||||
beg: k.beg,
|
||||
end: k.end,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -191,7 +202,10 @@ impl From<&NanoRange> for TimeRangeQuery {
|
||||
impl From<&PulseRange> for PulseRangeQuery {
|
||||
fn from(k: &PulseRange) -> Self {
|
||||
Self {
|
||||
range: PulseRange { beg: k.beg, end: k.end },
|
||||
range: PulseRange {
|
||||
beg: k.beg,
|
||||
end: k.end,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -267,7 +281,9 @@ pub fn binning_scheme_append_to_url(agg_kind: &AggKind, url: &mut Url) {
|
||||
}
|
||||
|
||||
// Absent AggKind is not considered an error.
|
||||
pub fn agg_kind_from_binning_scheme(pairs: &BTreeMap<String, String>) -> Result<Option<AggKind>, NetpodError> {
|
||||
pub fn agg_kind_from_binning_scheme(
|
||||
pairs: &BTreeMap<String, String>,
|
||||
) -> Result<Option<AggKind>, NetpodError> {
|
||||
let key = "binningScheme";
|
||||
if let Some(s) = pairs.get(key) {
|
||||
let ret = if s == "eventBlobs" {
|
||||
@@ -341,8 +357,12 @@ impl FromUrl for ChannelStateEventsQuery {
|
||||
}
|
||||
|
||||
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Self::Error> {
|
||||
let beg_date = pairs.get("begDate").ok_or_else(|| NetpodError::MissingTimerange)?;
|
||||
let end_date = pairs.get("endDate").ok_or_else(|| NetpodError::MissingTimerange)?;
|
||||
let beg_date = pairs
|
||||
.get("begDate")
|
||||
.ok_or_else(|| NetpodError::MissingTimerange)?;
|
||||
let end_date = pairs
|
||||
.get("endDate")
|
||||
.ok_or_else(|| NetpodError::MissingTimerange)?;
|
||||
let ret = Self {
|
||||
channel: SfDbChannel::from_pairs(&pairs)?,
|
||||
range: NanoRange {
|
||||
|
||||
@@ -40,7 +40,8 @@ impl Api1Range {
|
||||
|
||||
#[test]
|
||||
fn serde_de_range_zulu() {
|
||||
let s = r#"{"startDate": "2022-11-22T10:15:12.412Z", "endDate": "2022-11-22T10:15:12.413556Z"}"#;
|
||||
let s =
|
||||
r#"{"startDate": "2022-11-22T10:15:12.412Z", "endDate": "2022-11-22T10:15:12.413556Z"}"#;
|
||||
let range: Api1Range = serde_json::from_str(s).unwrap();
|
||||
assert_eq!(range.beg().offset().local_minus_utc(), 0);
|
||||
assert_eq!(range.end().offset().local_minus_utc(), 0);
|
||||
@@ -50,7 +51,8 @@ fn serde_de_range_zulu() {
|
||||
|
||||
#[test]
|
||||
fn serde_de_range_offset() {
|
||||
let s = r#"{"startDate": "2022-11-22T10:15:12.412Z", "endDate": "2022-11-22T10:15:12.413556Z"}"#;
|
||||
let s =
|
||||
r#"{"startDate": "2022-11-22T10:15:12.412Z", "endDate": "2022-11-22T10:15:12.413556Z"}"#;
|
||||
let range: Api1Range = serde_json::from_str(s).unwrap();
|
||||
assert_eq!(range.beg().offset().local_minus_utc(), 0);
|
||||
assert_eq!(range.end().offset().local_minus_utc(), 0);
|
||||
@@ -126,7 +128,10 @@ impl ChannelTuple {
|
||||
}
|
||||
|
||||
pub fn from_name(name: String) -> Self {
|
||||
Self { backend: None, name }
|
||||
Self {
|
||||
backend: None,
|
||||
name,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn backend(&self) -> Option<&String> {
|
||||
|
||||
@@ -21,8 +21,8 @@ impl TryFrom<&str> for Datetime {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(val: &str) -> Result<Self, Self::Error> {
|
||||
let dt =
|
||||
DateTime::<FixedOffset>::parse_from_rfc3339(val).map_err(|e| Error::with_msg_no_trace(format!("{e}")))?;
|
||||
let dt = DateTime::<FixedOffset>::parse_from_rfc3339(val)
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e}")))?;
|
||||
Ok(Datetime(dt))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,7 +109,9 @@ impl PreBinnedQuery {
|
||||
}
|
||||
|
||||
pub fn cache_usage(&self) -> CacheUsage {
|
||||
self.cache_usage.as_ref().map_or(CacheUsage::Use, |x| x.clone())
|
||||
self.cache_usage
|
||||
.as_ref()
|
||||
.map_or(CacheUsage::Use, |x| x.clone())
|
||||
}
|
||||
|
||||
pub fn buf_len_disk_io(&self) -> usize {
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
#![allow(unused)]
|
||||
use super::evrange::NanoRange;
|
||||
use super::evrange::SeriesRange;
|
||||
use crate::timeunits::SEC;
|
||||
@@ -10,8 +11,12 @@ use chrono::Utc;
|
||||
#[test]
|
||||
fn test_binned_range_covering_00() {
|
||||
let range = SeriesRange::TimeRange(NanoRange::from_date_time(
|
||||
DateTime::parse_from_rfc3339("1970-01-01T10:10:00Z").unwrap().into(),
|
||||
DateTime::parse_from_rfc3339("1970-01-01T10:20:00Z").unwrap().into(),
|
||||
DateTime::parse_from_rfc3339("1970-01-01T10:10:00Z")
|
||||
.unwrap()
|
||||
.into(),
|
||||
DateTime::parse_from_rfc3339("1970-01-01T10:20:00Z")
|
||||
.unwrap()
|
||||
.into(),
|
||||
));
|
||||
let r = BinnedRangeEnum::covering_range(range, 9).unwrap();
|
||||
assert_eq!(r.bin_count(), 10);
|
||||
@@ -36,8 +41,12 @@ fn test_binned_range_covering_00() {
|
||||
#[test]
|
||||
fn test_binned_range_covering_01() {
|
||||
let range = SeriesRange::TimeRange(NanoRange::from_date_time(
|
||||
DateTime::parse_from_rfc3339("1970-01-01T00:20:04Z").unwrap().into(),
|
||||
DateTime::parse_from_rfc3339("1970-01-01T00:21:10Z").unwrap().into(),
|
||||
DateTime::parse_from_rfc3339("1970-01-01T00:20:04Z")
|
||||
.unwrap()
|
||||
.into(),
|
||||
DateTime::parse_from_rfc3339("1970-01-01T00:21:10Z")
|
||||
.unwrap()
|
||||
.into(),
|
||||
));
|
||||
let r = BinnedRangeEnum::covering_range(range, 9).unwrap();
|
||||
assert_eq!(r.bin_count(), 14);
|
||||
@@ -62,8 +71,12 @@ fn test_binned_range_covering_01() {
|
||||
#[test]
|
||||
fn test_binned_range_covering_02() {
|
||||
let range = SeriesRange::TimeRange(NanoRange::from_date_time(
|
||||
DateTime::parse_from_rfc3339("1970-01-01T00:20:04Z").unwrap().into(),
|
||||
DateTime::parse_from_rfc3339("1970-01-01T00:22:10Z").unwrap().into(),
|
||||
DateTime::parse_from_rfc3339("1970-01-01T00:20:04Z")
|
||||
.unwrap()
|
||||
.into(),
|
||||
DateTime::parse_from_rfc3339("1970-01-01T00:22:10Z")
|
||||
.unwrap()
|
||||
.into(),
|
||||
));
|
||||
let r = BinnedRangeEnum::covering_range(range, 25).unwrap();
|
||||
assert_eq!(r.bin_count(), 26);
|
||||
|
||||
@@ -9,8 +9,6 @@ use crate::TsNano;
|
||||
use chrono::DateTime;
|
||||
use chrono::TimeZone;
|
||||
use chrono::Utc;
|
||||
use daqbuf_err as err;
|
||||
use err::Error;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::collections::BTreeMap;
|
||||
@@ -19,9 +17,18 @@ use url::Url;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum TimeRange {
|
||||
Time { beg: DateTime<Utc>, end: DateTime<Utc> },
|
||||
Pulse { beg: u64, end: u64 },
|
||||
Nano { beg: u64, end: u64 },
|
||||
Time {
|
||||
beg: DateTime<Utc>,
|
||||
end: DateTime<Utc>,
|
||||
},
|
||||
Pulse {
|
||||
beg: u64,
|
||||
end: u64,
|
||||
},
|
||||
Nano {
|
||||
beg: u64,
|
||||
end: u64,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq)]
|
||||
@@ -35,7 +42,12 @@ impl fmt::Debug for NanoRange {
|
||||
if true {
|
||||
let beg = TsNano(self.beg);
|
||||
let end = TsNano(self.end);
|
||||
write!(fmt, "NanoRange {{ beg: {}, end: {} }}", beg.fmt(), end.fmt())
|
||||
write!(
|
||||
fmt,
|
||||
"NanoRange {{ beg: {}, end: {} }}",
|
||||
beg.fmt(),
|
||||
end.fmt()
|
||||
)
|
||||
} else if false {
|
||||
let beg = TsNano(self.beg);
|
||||
let end = TsNano(self.end);
|
||||
@@ -51,7 +63,10 @@ impl fmt::Debug for NanoRange {
|
||||
.timestamp_opt((self.end / SEC) as i64, (self.end % SEC) as u32)
|
||||
.earliest();
|
||||
if let (Some(a), Some(b)) = (beg, end) {
|
||||
fmt.debug_struct("NanoRange").field("beg", &a).field("end", &b).finish()
|
||||
fmt.debug_struct("NanoRange")
|
||||
.field("beg", &a)
|
||||
.field("end", &b)
|
||||
.finish()
|
||||
} else {
|
||||
fmt.debug_struct("NanoRange")
|
||||
.field("beg", &beg)
|
||||
|
||||
@@ -38,7 +38,11 @@ impl RetentionTime {
|
||||
let margin_max = Duration::from_secs(day * 2);
|
||||
let ttl = self.ttl_ts_msp();
|
||||
let margin = ttl / 10;
|
||||
let margin = if margin >= margin_max { margin_max } else { margin };
|
||||
let margin = if margin >= margin_max {
|
||||
margin_max
|
||||
} else {
|
||||
margin
|
||||
};
|
||||
ttl + margin
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user