From 1632ad07dc042d8da2539535b928ef99f0538a7e Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 18 Mar 2025 14:52:26 +0100 Subject: [PATCH] Query for bin write index --- Cargo.toml | 1 + src/api4/binned.rs | 100 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 63d5e87..6b0d742 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,3 +14,4 @@ humantime = "2.1.0" humantime-serde = "1.1.1" autoerr = "0.0.3" netpod = { path = "../daqbuf-netpod", package = "daqbuf-netpod" } +series = { path = "../daqbuf-series", package = "daqbuf-series" } diff --git a/src/api4/binned.rs b/src/api4/binned.rs index bf8bf98..e56936c 100644 --- a/src/api4/binned.rs +++ b/src/api4/binned.rs @@ -15,6 +15,7 @@ use netpod::HasTimeout; use netpod::SfDbChannel; use serde::Deserialize; use serde::Serialize; +use series::msp::PrebinnedPartitioning; use std::collections::BTreeMap; use std::time::Duration; use url::Url; @@ -450,3 +451,102 @@ impl AppendToUrl for BinnedQuery { } } } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BinWriteIndexQuery { + channel: SfDbChannel, + range: SeriesRange, + #[serde(default)] + log_level: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + rt1: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + rt2: Option, + pbp: PrebinnedPartitioning, +} + +impl BinWriteIndexQuery { + pub fn range(&self) -> &SeriesRange { + &self.range + } + + pub fn channel(&self) -> &SfDbChannel { + &self.channel + } + + pub fn retention_time_1(&self) -> RetentionTime { + match self.rt1 { + Some(x) => RetentionTime::from_index_db_u16(x).unwrap_or(RetentionTime::Short), + None => RetentionTime::Short, + } + } + + pub fn retention_time_2(&self) -> RetentionTime { + match self.rt2 { + Some(x) => RetentionTime::from_index_db_u16(x).unwrap_or(RetentionTime::Short), + None => RetentionTime::Short, + } + } + + pub fn prebinned_partitioning(&self) -> PrebinnedPartitioning { + self.pbp.clone() + } + + pub fn log_level(&self) -> &str { + &self.log_level + } +} + +impl HasBackend for BinWriteIndexQuery { + fn backend(&self) -> &str { + self.channel.backend() + } +} + +impl FromUrl for BinWriteIndexQuery { + type Error = Error; + + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &BTreeMap) -> Result { + let ret = Self { + channel: SfDbChannel::from_pairs(&pairs)?, + range: SeriesRange::from_pairs(pairs)?, + log_level: pairs.get("log_level").map_or(String::new(), String::from), + rt1: pairs.get("rt1").map_or(Ok(None), |k| { + k.parse().map(Some).map_err(|_| Error::BadUseRt) + })?, + rt2: pairs.get("rt2").map_or(Ok(None), |k| { + k.parse().map(Some).map_err(|_| Error::BadUseRt) + })?, + pbp: pairs + .get("pbp") + .and_then(|x| x.parse().ok()) + .and_then(|x| PrebinnedPartitioning::from_db_ix(x).ok()) + .unwrap_or(PrebinnedPartitioning::Day1), + }; + let selfname = std::any::type_name::(); + debug!("{}::from_pairs {:?}", selfname, ret); + Ok(ret) + } +} + +impl AppendToUrl for BinWriteIndexQuery { + fn append_to_url(&self, url: &mut Url) { + self.channel.append_to_url(url); + self.range.append_to_url(url); + let mut g = url.query_pairs_mut(); + if self.log_level.len() != 0 { + g.append_pair("log_level", &self.log_level); + } + if let Some(x) = self.rt1.as_ref() { + g.append_pair("rt1", &x.to_string()); + } + if let Some(x) = self.rt2.as_ref() { + g.append_pair("rt2", &x.to_string()); + } + } +}