commit a4719aeacce701535b5dd4cc21e86390585ff05d Author: Dominik Werder Date: Fri Nov 8 09:16:54 2024 +0100 Factored into separate crate diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1b72444 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/Cargo.lock +/target diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..069d7db --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "daqbuf-query" +version = "0.0.3" +authors = ["Dominik Werder "] +edition = "2021" + +[dependencies] +serde = { version = "1.0.214", features = ["derive"] } +serde_json = "1.0.132" +tracing = "0.1.40" +chrono = { version = "0.4.38", features = ["serde"] } +url = "2.5.3" +humantime = "2.1.0" +humantime-serde = "1.1.1" +thiserror = "=0.0.1" +netpod = { path = "../daqbuf-netpod", package = "daqbuf-netpod" } + +[patch.crates-io] +thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" } diff --git a/src/api4.rs b/src/api4.rs new file mode 100644 index 0000000..af0af7b --- /dev/null +++ b/src/api4.rs @@ -0,0 +1,176 @@ +pub mod binned; +pub mod events; + +use chrono::DateTime; +use chrono::TimeZone; +use chrono::Utc; +use netpod::get_url_query_pairs; +use netpod::range::evrange::SeriesRange; +use netpod::ttl::RetentionTime; +use netpod::AppendToUrl; +use netpod::FromUrl; +use netpod::HasBackend; +use netpod::HasTimeout; +use netpod::ToNanos; +use netpod::TsNano; +use netpod::DATETIME_FMT_6MS; +use serde::Deserialize; +use serde::Serialize; +use std::collections::BTreeMap; +use std::time::Duration; +use url::Url; + +#[derive(Debug, thiserror::Error)] +#[cstm(name = "Query")] +pub enum Error { + MissingTimerange, + ChronoParse(#[from] chrono::ParseError), + HumantimeDurationParse(#[from] humantime::DurationError), + MissingBackend, + MissingRetentionTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AccountingIngestedBytesQuery { + backend: String, + range: SeriesRange, +} + +impl AccountingIngestedBytesQuery { + pub fn range(&self) -> &SeriesRange { + &self.range + } +} + +impl HasBackend for AccountingIngestedBytesQuery { + fn backend(&self) -> &str { + &self.backend + } +} + +impl HasTimeout for AccountingIngestedBytesQuery { + fn timeout(&self) -> Option { + None + } +} + +impl FromUrl for AccountingIngestedBytesQuery { + type Error = netpod::NetpodError; + + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &BTreeMap) -> Result { + let ret = Self { + backend: pairs + .get("backend") + .ok_or_else(|| netpod::NetpodError::MissingBackend)? + .to_string(), + range: SeriesRange::from_pairs(pairs)?, + }; + Ok(ret) + } +} + +impl AppendToUrl for AccountingIngestedBytesQuery { + fn append_to_url(&self, url: &mut Url) { + { + let mut g = url.query_pairs_mut(); + g.append_pair("backend", &self.backend); + } + self.range.append_to_url(url); + } +} + +// + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AccountingToplistQuery { + rt: RetentionTime, + backend: String, + ts: TsNano, + limit: u32, + sort: Option, +} + +impl AccountingToplistQuery { + pub fn rt(&self) -> RetentionTime { + self.rt.clone() + } + + pub fn ts(&self) -> TsNano { + self.ts.clone() + } + + pub fn limit(&self) -> u32 { + self.limit + } + + pub fn sort(&self) -> Option<&str> { + self.sort.as_ref().map(|x| x.as_str()) + } +} + +impl HasBackend for AccountingToplistQuery { + fn backend(&self) -> &str { + &self.backend + } +} + +impl HasTimeout for AccountingToplistQuery { + fn timeout(&self) -> Option { + None + } +} + +impl FromUrl for AccountingToplistQuery { + type Error = Error; + + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &BTreeMap) -> Result { + let fn1 = |pairs: &BTreeMap| { + let v = pairs.get("tsDate").ok_or(Self::Error::MissingTimerange)?; + let mut w = v.parse::>(); + if w.is_err() && v.ends_with("ago") { + let d = humantime::parse_duration(&v[..v.len() - 3])?; + w = Ok(Utc::now() - d); + } + let w = w?; + Ok::<_, Self::Error>(TsNano::from_ns(w.to_nanos())) + }; + let ret = Self { + rt: pairs + .get("retentionTime") + .ok_or_else(|| Self::Error::MissingRetentionTime) + .and_then(|x| x.parse().map_err(|_| Self::Error::MissingRetentionTime))?, + backend: pairs + .get("backend") + .ok_or_else(|| Self::Error::MissingBackend)? + .to_string(), + ts: fn1(pairs)?, + limit: pairs.get("limit").map_or(None, |x| x.parse().ok()).unwrap_or(20), + sort: pairs.get("sort").map(ToString::to_string), + }; + Ok(ret) + } +} + +impl AppendToUrl for AccountingToplistQuery { + fn append_to_url(&self, url: &mut Url) { + let mut g = url.query_pairs_mut(); + g.append_pair("backend", &self.backend); + g.append_pair( + "ts", + &Utc.timestamp_nanos(self.ts.ns() as i64) + .format(DATETIME_FMT_6MS) + .to_string(), + ); + g.append_pair("limit", &self.limit.to_string()); + } +} diff --git a/src/api4/binned.rs b/src/api4/binned.rs new file mode 100644 index 0000000..f80f442 --- /dev/null +++ b/src/api4/binned.rs @@ -0,0 +1,384 @@ +use crate::transform::TransformQuery; +use netpod::get_url_query_pairs; +use netpod::log::*; +use netpod::query::CacheUsage; +use netpod::range::evrange::SeriesRange; +use netpod::ttl::RetentionTime; +use netpod::AppendToUrl; +use netpod::BinnedRange; +use netpod::BinnedRangeEnum; +use netpod::ByteSize; +use netpod::DtMs; +use netpod::FromUrl; +use netpod::HasBackend; +use netpod::HasTimeout; +use netpod::SfDbChannel; +use serde::Deserialize; +use serde::Serialize; +use std::collections::BTreeMap; +use std::time::Duration; +use url::Url; + +#[derive(Debug, thiserror::Error)] +#[cstm(name = "BinnedQuery")] +pub enum Error { + BadInt(#[from] std::num::ParseIntError), + MultipleBinCountBinWidth, + BadUseRt, + Netpod(#[from] netpod::NetpodError), + Transform(#[from] crate::transform::Error), +} + +mod serde_option_vec_duration { + use serde::Deserialize; + use serde::Deserializer; + use serde::Serialize; + use serde::Serializer; + use std::time::Duration; + + #[derive(Debug, Clone, Serialize, Deserialize)] + struct HumantimeDuration { + #[serde(with = "humantime_serde")] + inner: Duration, + } + + pub fn serialize(val: &Option>, ser: S) -> Result + where + S: Serializer, + { + match val { + Some(vec) => { + // humantime_serde::serialize(&t, ser) + let t: Vec<_> = vec.iter().map(|&x| HumantimeDuration { inner: x }).collect(); + serde::Serialize::serialize(&t, ser) + } + None => ser.serialize_none(), + } + } + + pub fn deserialize<'a, D>(de: D) -> Result>, D::Error> + where + D: Deserializer<'a>, + { + let t: Option> = serde::Deserialize::deserialize(de)?; + Ok(t.map(|v| v.iter().map(|x| x.inner).collect())) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BinnedQuery { + channel: SfDbChannel, + range: SeriesRange, + #[serde(default, skip_serializing_if = "Option::is_none")] + bin_count: Option, + #[serde(default, skip_serializing_if = "Option::is_none", with = "humantime_serde")] + bin_width: Option, + #[serde( + default = "TransformQuery::default_time_binned", + skip_serializing_if = "TransformQuery::is_default_time_binned" + )] + transform: TransformQuery, + #[serde(default, skip_serializing_if = "Option::is_none")] + cache_usage: Option, + #[serde(default, skip_serializing_if = "Option::is_none", with = "serde_option_vec_duration")] + subgrids: Option>, + #[serde( + default, + skip_serializing_if = "Option::is_none", + with = "humantime_serde", + rename = "contentTimeout" + )] + timeout_content: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + buf_len_disk_io: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + disk_stats_every: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub merger_out_len_max: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + scylla_read_queue_len: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + test_do_wasm: Option, + #[serde(default)] + log_level: String, + #[serde(default)] + use_rt: Option, +} + +impl BinnedQuery { + pub fn new(channel: SfDbChannel, range: SeriesRange, bin_count: u32) -> Self { + Self { + channel, + range, + bin_count: Some(bin_count), + bin_width: None, + transform: TransformQuery::default_time_binned(), + cache_usage: None, + subgrids: None, + buf_len_disk_io: None, + disk_stats_every: None, + timeout_content: None, + merger_out_len_max: None, + scylla_read_queue_len: None, + test_do_wasm: None, + log_level: String::new(), + use_rt: None, + } + } + + pub fn range(&self) -> &SeriesRange { + &self.range + } + + pub fn channel(&self) -> &SfDbChannel { + &self.channel + } + + pub fn bin_count(&self) -> Option { + self.bin_count + } + + pub fn bin_width(&self) -> Option { + self.bin_width + } + + pub fn transform(&self) -> &TransformQuery { + &self.transform + } + + pub fn cache_usage(&self) -> Option { + self.cache_usage.clone() + } + + pub fn disk_stats_every(&self) -> ByteSize { + match &self.disk_stats_every { + Some(x) => x.clone(), + None => ByteSize(1024 * 1024 * 4), + } + } + + pub fn buf_len_disk_io(&self) -> usize { + match self.buf_len_disk_io { + Some(x) => x, + None => 1024 * 16, + } + } + + pub fn timeout_content(&self) -> Option { + self.timeout_content + } + + pub fn subgrids(&self) -> Option<&[Duration]> { + self.subgrids.as_ref().map(|x| x.as_slice()) + } + + pub fn merger_out_len_max(&self) -> Option { + self.merger_out_len_max + } + + pub fn scylla_read_queue_len(&self) -> Option { + self.scylla_read_queue_len + } + + pub fn set_series_id(&mut self, series: u64) { + self.channel.set_series(series); + } + + pub fn channel_mut(&mut self) -> &mut SfDbChannel { + &mut self.channel + } + + pub fn set_cache_usage(&mut self, k: CacheUsage) { + self.cache_usage = Some(k); + } + + pub fn set_disk_stats_every(&mut self, k: ByteSize) { + self.disk_stats_every = Some(k); + } + + // Currently only for testing + pub fn set_timeout(&mut self, k: Duration) { + self.timeout_content = Some(k); + } + + pub fn set_buf_len_disk_io(&mut self, k: usize) { + self.buf_len_disk_io = Some(k); + } + + pub fn for_time_weighted_scalar(self) -> Self { + let mut v = self; + v.transform = TransformQuery::for_time_weighted_scalar(); + v + } + + pub fn test_do_wasm(&self) -> Option<&str> { + match &self.test_do_wasm { + Some(x) => Some(&x), + None => None, + } + } + + pub fn log_level(&self) -> &str { + &self.log_level + } + + pub fn use_rt(&self) -> Option { + self.use_rt.clone() + } + + pub fn covering_range(&self) -> Result { + match &self.range { + SeriesRange::TimeRange(range) => match self.bin_width { + Some(dt) => { + if self.bin_count.is_some() { + Err(Error::MultipleBinCountBinWidth) + } else { + let ret = BinnedRangeEnum::Time(BinnedRange::covering_range_time( + range.clone(), + DtMs::from_ms_u64(dt.as_millis() as u64), + )?); + Ok(ret) + } + } + None => { + let bc = self.bin_count.unwrap_or(20); + let ret = BinnedRangeEnum::covering_range(self.range.clone(), bc)?; + Ok(ret) + } + }, + SeriesRange::PulseRange(_) => todo!(), + } + } +} + +impl HasBackend for BinnedQuery { + fn backend(&self) -> &str { + self.channel.backend() + } +} + +impl HasTimeout for BinnedQuery { + fn timeout(&self) -> Option { + self.timeout_content + } +} + +impl FromUrl for BinnedQuery { + type Error = Error; + + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &BTreeMap) -> Result { + let ret = Self { + channel: SfDbChannel::from_pairs(&pairs)?, + range: SeriesRange::from_pairs(pairs)?, + bin_count: pairs.get("binCount").and_then(|x| x.parse().ok()), + bin_width: pairs.get("binWidth").and_then(|x| humantime::parse_duration(x).ok()), + transform: TransformQuery::from_pairs(pairs)?, + cache_usage: CacheUsage::from_pairs(&pairs)?, + buf_len_disk_io: pairs + .get("bufLenDiskIo") + .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + disk_stats_every: pairs + .get("diskStatsEveryKb") + .map(|k| k.parse().ok()) + .unwrap_or(None) + .map(ByteSize::from_kb), + /*report_error: pairs + .get("reportError") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?,*/ + timeout_content: pairs + .get("contentTimeout") + .and_then(|x| humantime::parse_duration(x).ok()), + subgrids: pairs + .get("subgrids") + .map(|x| x.split(",").filter_map(|x| humantime::parse_duration(x).ok()).collect()), + merger_out_len_max: pairs + .get("mergerOutLenMax") + .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + scylla_read_queue_len: pairs + .get("scyllaReadQueueLen") + .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + test_do_wasm: pairs.get("testDoWasm").map(|x| String::from(x)), + log_level: pairs.get("log_level").map_or(String::new(), String::from), + use_rt: pairs + .get("useRt") + .map_or(Ok(None), |k| k.parse().map(Some).map_err(|_| Error::BadUseRt))?, + }; + debug!("BinnedQuery::from_url {:?}", ret); + Ok(ret) + } +} + +impl AppendToUrl for BinnedQuery { + fn append_to_url(&self, url: &mut Url) { + self.channel.append_to_url(url); + self.range.append_to_url(url); + { + let mut g = url.query_pairs_mut(); + if let Some(x) = self.bin_count { + g.append_pair("binCount", &format!("{}", x)); + } + if let Some(x) = self.bin_width { + if x < Duration::from_secs(1) { + g.append_pair("binWidth", &format!("{:.0}ms", x.subsec_millis())); + } else if x < Duration::from_secs(60) { + g.append_pair("binWidth", &format!("{:.0}s", x.as_secs_f64())); + } else if x < Duration::from_secs(60 * 60) { + g.append_pair("binWidth", &format!("{:.0}m", x.as_secs() / 60)); + } else if x < Duration::from_secs(60 * 60 * 24) { + g.append_pair("binWidth", &format!("{:.0}h", x.as_secs() / 60 / 60)); + } else { + g.append_pair("binWidth", &format!("{:.0}d", x.as_secs() / 60 / 60 / 24)); + } + } + } + self.transform.append_to_url(url); + let mut g = url.query_pairs_mut(); + if let Some(x) = &self.cache_usage { + g.append_pair("cacheUsage", &x.query_param_value()); + } + if let Some(x) = &self.timeout_content { + g.append_pair("contentTimeout", &format!("{:.0}ms", 1e3 * x.as_secs_f64())); + } + if let Some(x) = &self.subgrids { + let s: String = + x.iter() + .map(|&x| humantime::format_duration(x).to_string()) + .fold(String::new(), |mut a, x| { + if a.len() != 0 { + a.push_str(","); + } + a.push_str(&x); + a + }); + g.append_pair("subgrids", &s); + } + if let Some(x) = self.buf_len_disk_io { + g.append_pair("bufLenDiskIo", &format!("{}", x)); + } + if let Some(x) = &self.disk_stats_every { + g.append_pair("diskStatsEveryKb", &format!("{}", x.bytes() / 1024)); + } + if let Some(x) = self.merger_out_len_max.as_ref() { + g.append_pair("mergerOutLenMax", &format!("{}", x)); + } + if let Some(x) = self.scylla_read_queue_len.as_ref() { + g.append_pair("scyllaReadQueueLen", &x.to_string()); + } + if let Some(x) = &self.test_do_wasm { + g.append_pair("testDoWasm", &x); + } + if self.log_level.len() != 0 { + g.append_pair("log_level", &self.log_level); + } + if let Some(x) = self.use_rt.as_ref() { + g.append_pair("useRt", &x.to_string()); + } + } +} diff --git a/src/api4/events.rs b/src/api4/events.rs new file mode 100644 index 0000000..294f816 --- /dev/null +++ b/src/api4/events.rs @@ -0,0 +1,670 @@ +use super::binned::BinnedQuery; +use crate::transform::TransformQuery; +use netpod::get_url_query_pairs; +use netpod::is_false; +use netpod::query::api1::Api1Query; +use netpod::query::PulseRangeQuery; +use netpod::query::TimeRangeQuery; +use netpod::range::evrange::SeriesRange; +use netpod::ttl::RetentionTime; +use netpod::AppendToUrl; +use netpod::ByteSize; +use netpod::ChannelTypeConfigGen; +use netpod::DiskIoTune; +use netpod::FromUrl; +use netpod::HasBackend; +use netpod::HasTimeout; +use netpod::SfDbChannel; +use serde::Deserialize; +use serde::Serialize; +use std::collections::BTreeMap; +use std::time::Duration; +use url::Url; + +#[derive(Debug, thiserror::Error)] +#[cstm(name = "EventsQuery")] +pub enum Error { + BadInt(#[from] std::num::ParseIntError), + MissingTimerange, + BadQuery, + Transform(#[from] crate::transform::Error), + Netpod(#[from] netpod::NetpodError), +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PlainEventsQuery { + channel: SfDbChannel, + range: SeriesRange, + #[serde(default, skip_serializing_if = "is_false", rename = "oneBeforeRange")] + one_before_range: bool, + #[serde(default, skip_serializing_if = "is_false", rename = "begExcl")] + beg_excl: bool, + #[serde(default = "TransformQuery::default_events")] + #[serde(skip_serializing_if = "TransformQuery::is_default_events")] + transform: TransformQuery, + #[serde( + default, + skip_serializing_if = "Option::is_none", + with = "humantime_serde", + rename = "contentTimeout" + )] + timeout_content: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + events_max: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + bytes_max: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + allow_large_result: Option, + #[serde(default, skip_serializing_if = "Option::is_none", with = "humantime_serde")] + event_delay: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + stream_batch_len: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + buf_len_disk_io: Option, + #[serde(default, skip_serializing_if = "is_false")] + do_test_main_error: bool, + #[serde(default, skip_serializing_if = "is_false")] + do_test_stream_error: bool, + #[serde(default, skip_serializing_if = "Option::is_none")] + test_do_wasm: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + merger_out_len_max: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + scylla_read_queue_len: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + create_errors: Vec, + #[serde(default)] + log_level: String, + #[serde(default)] + use_rt: Option, + querymarker: String, +} + +impl PlainEventsQuery { + pub fn new(channel: SfDbChannel, range: R) -> Self + where + R: Into, + { + Self { + channel, + range: range.into(), + beg_excl: false, + one_before_range: false, + transform: TransformQuery::default_events(), + timeout_content: None, + events_max: None, + bytes_max: None, + allow_large_result: None, + event_delay: None, + stream_batch_len: None, + buf_len_disk_io: None, + do_test_main_error: false, + do_test_stream_error: false, + test_do_wasm: None, + merger_out_len_max: None, + scylla_read_queue_len: None, + create_errors: Vec::new(), + log_level: String::new(), + use_rt: None, + querymarker: String::new(), + } + } + + pub fn channel(&self) -> &SfDbChannel { + &self.channel + } + + pub fn range(&self) -> &SeriesRange { + &self.range + } + + pub fn one_before_range(&self) -> bool { + self.one_before_range || self.transform.need_one_before_range() + } + + pub fn transform(&self) -> &TransformQuery { + &self.transform + } + + pub fn buf_len_disk_io(&self) -> usize { + self.buf_len_disk_io.unwrap_or(1024 * 8) + } + + pub fn timeout_content(&self) -> Option { + self.timeout_content + } + + pub fn events_max(&self) -> u64 { + self.events_max.map_or_else( + || { + if self.allow_large_result_or_def() { + 1000 * 500 + } else { + 1000 * 80 + } + }, + |x| x.min(1000 * 1000 * 4), + ) + } + + pub fn bytes_max(&self) -> u64 { + self.bytes_max.map_or_else( + || { + if self.allow_large_result_or_def() { + 1024 * 1024 * 80 + } else { + 1024 * 1024 * 8 + } + }, + |x| x.min(1024 * 1024 * 400), + ) + } + + pub fn allow_large_result_or_def(&self) -> bool { + self.allow_large_result.unwrap_or(false) + } + + pub fn event_delay(&self) -> &Option { + &self.event_delay + } + + pub fn merger_out_len_max(&self) -> Option { + self.merger_out_len_max + } + + pub fn scylla_read_queue_len(&self) -> Option { + self.scylla_read_queue_len + } + + pub fn do_test_main_error(&self) -> bool { + self.do_test_main_error + } + + pub fn do_test_stream_error(&self) -> bool { + self.do_test_stream_error + } + + pub fn test_do_wasm(&self) -> Option<&str> { + match &self.test_do_wasm { + Some(x) => Some(&x), + None => None, + } + } + + pub fn set_series_id(&mut self, series: u64) { + self.channel.set_series(series); + } + + pub fn set_do_test_main_error(&mut self, k: bool) { + self.do_test_main_error = k; + } + + pub fn set_do_test_stream_error(&mut self, k: bool) { + self.do_test_stream_error = k; + } + + pub fn for_time_weighted_scalar(mut self) -> Self { + self.transform = TransformQuery::for_time_weighted_scalar(); + self + } + + pub fn for_pulse_id_diff(mut self) -> Self { + self.transform = TransformQuery::for_pulse_id_diff(); + self + } + + pub fn is_event_blobs(&self) -> bool { + self.transform.is_event_blobs() + } + + pub fn need_value_data(&self) -> bool { + self.transform.need_value_data() + } + + pub fn create_errors_contains(&self, x: &str) -> bool { + self.create_errors.contains(&String::from(x)) + } + + pub fn summary_short(&self) -> String { + format!( + "PlainEventsQuery {{ chn: {}, range: {:?} }}", + self.channel().name(), + self.range() + ) + } + + pub fn log_level(&self) -> &str { + &self.log_level + } + + pub fn use_rt(&self) -> Option { + self.use_rt.clone() + } +} + +impl HasBackend for PlainEventsQuery { + fn backend(&self) -> &str { + self.channel.backend() + } +} + +impl HasTimeout for PlainEventsQuery { + fn timeout(&self) -> Option { + PlainEventsQuery::timeout_content(self) + } +} + +impl FromUrl for PlainEventsQuery { + type Error = Error; + + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &BTreeMap) -> Result { + let range = if let Ok(x) = TimeRangeQuery::from_pairs(pairs) { + SeriesRange::TimeRange(x.into()) + } else if let Ok(x) = PulseRangeQuery::from_pairs(pairs) { + SeriesRange::PulseRange(x.into()) + } else { + return Err(Error::MissingTimerange); + }; + let ret = Self { + channel: SfDbChannel::from_pairs(pairs)?, + range, + one_before_range: pairs.get("oneBeforeRange").map_or(false, |x| x == "true"), + beg_excl: pairs.get("begExcl").map_or(false, |x| x == "true"), + transform: TransformQuery::from_pairs(pairs)?, + timeout_content: pairs + .get("contentTimeout") + .and_then(|x| humantime::parse_duration(x).ok()), + events_max: pairs.get("eventsMax").map_or(None, |k| k.parse().ok()), + bytes_max: pairs.get("bytesMax").map_or(None, |k| k.parse().ok()), + allow_large_result: pairs.get("allowLargeResult").map_or(None, |x| x.parse().ok()), + event_delay: pairs.get("eventDelay").map_or(Ok(None), |k| { + k.parse::().map(|x| Duration::from_millis(x)).map(|k| Some(k)) + })?, + stream_batch_len: pairs + .get("streamBatchLen") + .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + buf_len_disk_io: pairs + .get("bufLenDiskIo") + .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + do_test_main_error: pairs + .get("doTestMainError") + .map_or("false", |k| k) + .parse() + .map_err(|_| Error::BadQuery)?, + do_test_stream_error: pairs + .get("doTestStreamError") + .map_or("false", |k| k) + .parse() + .map_err(|_| Error::BadQuery)?, + // test_do_wasm: pairs + // .get("testDoWasm") + // .map(|x| x.parse::().ok()) + // .unwrap_or(None) + // .unwrap_or(false), + test_do_wasm: pairs.get("testDoWasm").map(|x| String::from(x)), + merger_out_len_max: pairs + .get("mergerOutLenMax") + .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + scylla_read_queue_len: pairs + .get("scyllaReadQueueLen") + .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + create_errors: pairs + .get("create_errors") + .map(|x| x.split(",").map(|x| x.to_string()).collect()) + .unwrap_or(Vec::new()), + log_level: pairs.get("log_level").map_or(String::new(), String::from), + use_rt: pairs + .get("useRt") + .map_or(Ok(None), |k| k.parse().map(Some).map_err(|_| Error::BadQuery))?, + querymarker: pairs.get("querymarker").map_or(String::new(), |x| x.to_string()), + }; + Ok(ret) + } +} + +impl AppendToUrl for PlainEventsQuery { + fn append_to_url(&self, url: &mut Url) { + match &self.range { + SeriesRange::TimeRange(k) => TimeRangeQuery::from(k).append_to_url(url), + SeriesRange::PulseRange(_) => todo!(), + } + self.channel.append_to_url(url); + let mut g = url.query_pairs_mut(); + g.append_pair("oneBeforeRange", &self.one_before_range().to_string()); + if self.beg_excl { + g.append_pair("begExcl", "true"); + } + g.append_pair("querymarker", &self.querymarker); + drop(g); + self.transform.append_to_url(url); + let mut g = url.query_pairs_mut(); + if let Some(x) = &self.timeout_content { + g.append_pair("contentTimeout", &format!("{:.0}ms", 1e3 * x.as_secs_f64())); + } + if let Some(x) = self.events_max.as_ref() { + g.append_pair("eventsMax", &x.to_string()); + } + if let Some(x) = self.bytes_max.as_ref() { + g.append_pair("bytesMax", &x.to_string()); + } + if let Some(x) = self.allow_large_result { + g.append_pair("allowLargeResult", &x.to_string()); + } + if let Some(x) = self.event_delay.as_ref() { + g.append_pair("eventDelay", &format!("{:.0}", x.as_secs_f64() * 1e3)); + } + if let Some(x) = self.stream_batch_len.as_ref() { + g.append_pair("streamBatchLen", &x.to_string()); + } + if let Some(x) = self.buf_len_disk_io.as_ref() { + g.append_pair("bufLenDiskIo", &x.to_string()); + } + if self.do_test_main_error { + g.append_pair("doTestMainError", "true"); + } + if self.do_test_stream_error { + g.append_pair("doTestStreamError", "true"); + } + if let Some(x) = &self.test_do_wasm { + g.append_pair("testDoWasm", &x); + } + if let Some(x) = self.merger_out_len_max.as_ref() { + g.append_pair("mergerOutLenMax", &x.to_string()); + } + if let Some(x) = self.scylla_read_queue_len.as_ref() { + g.append_pair("scyllaReadQueueLen", &x.to_string()); + } + if self.create_errors.len() != 0 { + g.append_pair("create_errors", &self.create_errors.join(",")); + } + if self.log_level.len() != 0 { + g.append_pair("log_level", &self.log_level); + } + if let Some(x) = self.use_rt.as_ref() { + g.append_pair("useRt", &x.to_string()); + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct EventsSubQuerySelect { + ch_conf: ChannelTypeConfigGen, + range: SeriesRange, + one_before_range: bool, + transform: TransformQuery, + wasm1: Option, +} + +impl EventsSubQuerySelect { + pub fn new( + ch_info: ChannelTypeConfigGen, + range: SeriesRange, + one_before_range: bool, + transform: TransformQuery, + ) -> Self { + Self { + ch_conf: ch_info, + range, + one_before_range, + transform, + wasm1: None, + } + } + + pub fn wasm1(&self) -> Option<&str> { + match &self.wasm1 { + Some(x) => Some(&x), + None => None, + } + } + + pub fn set_wasm1(&mut self, x: String) { + self.wasm1 = Some(x); + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct EventsSubQuerySettings { + timeout: Option, + events_max: Option, + bytes_max: Option, + event_delay: Option, + stream_batch_len: Option, + buf_len_disk_io: Option, + queue_len_disk_io: Option, + create_errors: Vec, + use_rt: Option, + merger_out_len_max: Option, + scylla_read_queue_len: Option, +} + +impl EventsSubQuerySettings { + pub fn merger_out_len_max(&self) -> Option { + self.merger_out_len_max + } + + pub fn scylla_read_queue_len(&self) -> Option { + self.scylla_read_queue_len + } +} + +impl Default for EventsSubQuerySettings { + fn default() -> Self { + Self { + timeout: None, + events_max: None, + bytes_max: None, + event_delay: None, + stream_batch_len: None, + buf_len_disk_io: None, + queue_len_disk_io: None, + create_errors: Vec::new(), + use_rt: None, + merger_out_len_max: None, + scylla_read_queue_len: None, + } + } +} + +impl From<&PlainEventsQuery> for EventsSubQuerySettings { + fn from(value: &PlainEventsQuery) -> Self { + Self { + timeout: value.timeout_content(), + events_max: value.events_max, + bytes_max: value.bytes_max, + event_delay: value.event_delay, + stream_batch_len: value.stream_batch_len, + buf_len_disk_io: value.buf_len_disk_io, + // TODO add to query + queue_len_disk_io: None, + create_errors: value.create_errors.clone(), + use_rt: value.use_rt(), + merger_out_len_max: value.merger_out_len_max(), + scylla_read_queue_len: value.scylla_read_queue_len(), + } + } +} + +impl From<&BinnedQuery> for EventsSubQuerySettings { + fn from(value: &BinnedQuery) -> Self { + Self { + timeout: value.timeout(), + // TODO ? + events_max: None, + bytes_max: None, + event_delay: None, + stream_batch_len: None, + buf_len_disk_io: None, + // TODO add to query + queue_len_disk_io: None, + create_errors: Vec::new(), + use_rt: value.use_rt(), + merger_out_len_max: value.merger_out_len_max(), + scylla_read_queue_len: value.scylla_read_queue_len(), + } + } +} + +impl From<&Api1Query> for EventsSubQuerySettings { + fn from(value: &Api1Query) -> Self { + let disk_io_tune = value.disk_io_tune(); + Self { + timeout: value.timeout(), + // TODO ? + events_max: None, + bytes_max: None, + event_delay: None, + stream_batch_len: None, + buf_len_disk_io: Some(disk_io_tune.read_buffer_len), + queue_len_disk_io: Some(disk_io_tune.read_queue_len), + create_errors: Vec::new(), + use_rt: None, + merger_out_len_max: None, + scylla_read_queue_len: None, + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct EventsSubQuery { + select: EventsSubQuerySelect, + settings: EventsSubQuerySettings, + ty: String, + reqid: String, + log_level: String, +} + +impl EventsSubQuery { + pub fn from_parts( + select: EventsSubQuerySelect, + settings: EventsSubQuerySettings, + reqid: String, + log_level: String, + ) -> Self { + Self { + select, + settings, + ty: "EventsSubQuery".into(), + reqid, + log_level, + } + } + + pub fn backend(&self) -> &str { + &self.select.ch_conf.backend() + } + + pub fn name(&self) -> &str { + &self.select.ch_conf.name() + } + + pub fn range(&self) -> &SeriesRange { + &self.select.range + } + + pub fn need_one_before_range(&self) -> bool { + self.select.one_before_range | self.transform().need_one_before_range() + } + + pub fn transform(&self) -> &TransformQuery { + &self.select.transform + } + + pub fn ch_conf(&self) -> &ChannelTypeConfigGen { + &self.select.ch_conf + } + + pub fn timeout(&self) -> Duration { + self.settings.timeout.unwrap_or(Duration::from_millis(10000)) + } + + pub fn events_max(&self) -> u64 { + self.settings.events_max.unwrap_or(1024 * 128) + } + + pub fn event_delay(&self) -> &Option { + &self.settings.event_delay + } + + pub fn disk_io_tune(&self) -> DiskIoTune { + let mut tune = DiskIoTune::default(); + if let Some(x) = self.settings.buf_len_disk_io { + tune.read_buffer_len = x; + } + tune + } + + pub fn inmem_bufcap(&self) -> ByteSize { + // TODO should depend on the type of backend: only imagebuffer needs large size. + ByteSize::from_kb(1024 * 30) + } + + // A rough indication on how many bytes this request is allowed to return. Otherwise, the result should + // be a partial result. + pub fn bytes_max(&self) -> u64 { + self.settings.events_max.unwrap_or(1024 * 512) + } + + pub fn is_event_blobs(&self) -> bool { + self.select.transform.is_event_blobs() + } + + pub fn need_value_data(&self) -> bool { + self.select.transform.need_value_data() + } + + pub fn create_errors_contains(&self, x: &str) -> bool { + self.settings.create_errors.contains(&String::from(x)) + } + + pub fn reqid(&self) -> &str { + &self.reqid + } + + pub fn wasm1(&self) -> Option<&str> { + self.select.wasm1() + } + + pub fn log_level(&self) -> &str { + &self.log_level + } + + pub fn use_rt(&self) -> Option { + self.settings.use_rt.clone() + } + + pub fn merger_out_len_max(&self) -> Option { + self.settings.merger_out_len_max() + } + + pub fn settings(&self) -> &EventsSubQuerySettings { + &self.settings + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Frame1Parts { + query: EventsSubQuery, +} + +impl Frame1Parts { + pub fn new(query: EventsSubQuery) -> Self { + Self { query } + } + + pub fn parts(self) -> (EventsSubQuery,) { + (self.query,) + } +} + +#[test] +fn parse_frame1() { + let inp = r##"{"query":{"select":{"ch_conf":{"Scylla":{"backend":"swissfel-daqbuf-ca","series":2367705320261409690,"scalar_type":"ChannelStatus","shape":[],"name":"SLGRE-LI2C03_CH6:TEMP"}},"range":{"TimeRange":{"beg":1695736001000000000,"end":1695736301000000000}},"transform":{"event":"ValueFull","time_binning":"None"},"wasm1":null},"settings":{"timeout":null,"events_max":200000,"event_delay":null,"stream_batch_len":null,"buf_len_disk_io":null,"queue_len_disk_io":null,"create_errors":[]},"ty":"EventsSubQuery","reqid":"3ea23209"}}"##; + // TODO assert + let _v: Frame1Parts = serde_json::from_str(inp).unwrap(); +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..598bce0 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,2 @@ +pub mod api4; +pub mod transform; diff --git a/src/transform.rs b/src/transform.rs new file mode 100644 index 0000000..1f4d59a --- /dev/null +++ b/src/transform.rs @@ -0,0 +1,287 @@ +use netpod::get_url_query_pairs; +use netpod::log::*; +use netpod::AppendToUrl; +use netpod::FromUrl; +use serde::Deserialize; +use serde::Serialize; +use std::collections::BTreeMap; +use thiserror; +use url::Url; + +#[derive(Debug, thiserror::Error)] +#[cstm(name = "Query")] +pub enum Error { + ParseInt(#[from] std::num::ParseIntError), + BadEnumAsString, + BadBinningScheme, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum EventTransformQuery { + EventBlobsVerbatim, + EventBlobsUncompressed, + ValueFull, + ArrayPick(usize), + // TODO should rename to scalar? dim0 will only stay a scalar. + MinMaxAvgDev, + PulseIdDiff, +} + +impl EventTransformQuery { + pub fn need_value_data(&self) -> bool { + match self { + EventTransformQuery::EventBlobsVerbatim => true, + EventTransformQuery::EventBlobsUncompressed => true, + EventTransformQuery::ValueFull => true, + EventTransformQuery::ArrayPick(_) => true, + EventTransformQuery::MinMaxAvgDev => true, + EventTransformQuery::PulseIdDiff => false, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum TimeBinningTransformQuery { + None, + TimeWeighted, + Unweighted, +} + +impl TimeBinningTransformQuery { + pub fn need_one_before_range(&self) -> bool { + match self { + TimeBinningTransformQuery::None => false, + TimeBinningTransformQuery::TimeWeighted => true, + TimeBinningTransformQuery::Unweighted => false, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct TransformQuery { + event: EventTransformQuery, + time_binning: TimeBinningTransformQuery, + // #[serde(default, skip_serializing_if = "Option::is_none")] + enum_as_string: Option, +} + +impl TransformQuery { + fn url_prefix() -> &'static str { + "transform" + } + + pub fn default_events() -> Self { + Self { + event: EventTransformQuery::ValueFull, + time_binning: TimeBinningTransformQuery::None, + enum_as_string: None, + } + } + + pub fn default_time_binned() -> Self { + Self { + event: EventTransformQuery::MinMaxAvgDev, + time_binning: TimeBinningTransformQuery::TimeWeighted, + enum_as_string: None, + } + } + + pub fn is_default_events(&self) -> bool { + self == &Self::default_events() + } + + pub fn is_default_time_binned(&self) -> bool { + self == &Self::default_time_binned() + } + + pub fn for_event_blobs() -> Self { + Self { + event: EventTransformQuery::EventBlobsVerbatim, + time_binning: TimeBinningTransformQuery::None, + enum_as_string: None, + } + } + + pub fn for_time_weighted_scalar() -> Self { + Self { + event: EventTransformQuery::MinMaxAvgDev, + time_binning: TimeBinningTransformQuery::TimeWeighted, + enum_as_string: None, + } + } + + pub fn for_pulse_id_diff() -> Self { + Self { + event: EventTransformQuery::PulseIdDiff, + // TODO probably we want unweighted binning here. + time_binning: TimeBinningTransformQuery::TimeWeighted, + enum_as_string: None, + } + } + + pub fn is_event_blobs(&self) -> bool { + match &self.event { + EventTransformQuery::EventBlobsVerbatim => true, + EventTransformQuery::EventBlobsUncompressed => { + error!("TODO decide on uncompressed event blobs"); + panic!() + } + _ => false, + } + } + + pub fn need_value_data(&self) -> bool { + self.event.need_value_data() + } + + pub fn need_one_before_range(&self) -> bool { + self.time_binning.need_one_before_range() + } + + pub fn is_pulse_id_diff(&self) -> bool { + match &self.event { + EventTransformQuery::PulseIdDiff => true, + _ => false, + } + } + + pub fn get_tr_event(&self) -> &EventTransformQuery { + &self.event + } + + pub fn get_tr_time_binning(&self) -> &TimeBinningTransformQuery { + &self.time_binning + } + + pub fn enum_as_string(&self) -> Option { + self.enum_as_string.clone() + } + + pub fn do_wasm(&self) -> Option<&str> { + None + } +} + +impl FromUrl for TransformQuery { + type Error = Error; + + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &BTreeMap) -> Result { + let enum_as_string = if let Some(k) = pairs.get("enumAsString") { + Some(k.parse().map_err(|_| Error::BadEnumAsString)?) + } else { + None + }; + // enum_string: Ok(pairs.get("enumString")).and_then(|x| { + // x.map_or(Ok(None), |k| { + // k.parse() + // .map_err(|_| Error::with_public_msg_no_trace(format!("can not parse enumString: {}", k)))? + // }) + // })?, + let upre = Self::url_prefix(); + let key = "binningScheme"; + if let Some(s) = pairs.get(key) { + let ret = if s == "eventBlobs" { + TransformQuery { + event: EventTransformQuery::EventBlobsVerbatim, + time_binning: TimeBinningTransformQuery::None, + enum_as_string, + } + } else if s == "fullValue" { + TransformQuery { + event: EventTransformQuery::ValueFull, + time_binning: TimeBinningTransformQuery::None, + enum_as_string, + } + } else if s == "timeWeightedScalar" { + TransformQuery { + event: EventTransformQuery::MinMaxAvgDev, + time_binning: TimeBinningTransformQuery::TimeWeighted, + enum_as_string, + } + } else if s == "unweightedScalar" { + TransformQuery { + event: EventTransformQuery::ValueFull, + time_binning: TimeBinningTransformQuery::None, + enum_as_string, + } + } else if s == "binnedX" { + let _u: usize = pairs.get("binnedXcount").map_or("1", |k| k).parse()?; + warn!("TODO binnedXcount"); + TransformQuery { + event: EventTransformQuery::MinMaxAvgDev, + time_binning: TimeBinningTransformQuery::None, + enum_as_string, + } + } else if s == "pulseIdDiff" { + TransformQuery { + event: EventTransformQuery::PulseIdDiff, + time_binning: TimeBinningTransformQuery::None, + enum_as_string, + } + } else { + return Err(Error::BadBinningScheme); + }; + Ok(ret) + } else { + // TODO add option to pick from array. + let _pick = pairs + .get(&format!("{}ArrayPick", upre)) + .map(|x| match x.parse::() { + Ok(n) => Some(n), + Err(_) => None, + }) + .unwrap_or(None); + let ret = TransformQuery { + event: EventTransformQuery::ValueFull, + time_binning: TimeBinningTransformQuery::None, + enum_as_string, + }; + Ok(ret) + } + } +} + +impl AppendToUrl for TransformQuery { + fn append_to_url(&self, url: &mut Url) { + let mut g = url.query_pairs_mut(); + if false { + let upre = Self::url_prefix(); + if let Some(x) = &Some(123) { + g.append_pair(&format!("{}ArrayPick", upre), &format!("{}", x)); + } + } + let key = "binningScheme"; + match &self.event { + EventTransformQuery::EventBlobsVerbatim => { + g.append_pair(key, &format!("{}", "eventBlobs")); + } + EventTransformQuery::EventBlobsUncompressed => { + // TODO + g.append_pair(key, &format!("{}", "eventBlobs")); + } + EventTransformQuery::ValueFull => { + g.append_pair(key, &format!("{}", "fullValue")); + } + EventTransformQuery::ArrayPick(_) => { + // TODO + g.append_pair(key, &format!("{}", "fullValue")); + } + EventTransformQuery::MinMaxAvgDev => { + g.append_pair(key, &format!("{}", "timeWeightedScalar")); + } + EventTransformQuery::PulseIdDiff => { + g.append_pair(key, &format!("{}", "pulseIdDiff")); + } + } + if let Some(x) = self.enum_as_string { + if x { + g.append_pair("enumAsString", "true"); + } + } + } +}