From de4569d686bb7282d9732709267a17f8a2e1ba96 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 4 Sep 2024 16:32:30 +0200 Subject: [PATCH] Deliver enum data in a better formatted way --- crates/httpret/src/proxy.rs | 43 ++- crates/httpret/src/pulsemap.rs | 9 +- crates/items_0/src/isodate.rs | 13 +- crates/items_2/src/binsdim0.rs | 4 +- crates/items_2/src/binsxbindim0.rs | 4 +- crates/items_2/src/eventsdim0.rs | 233 +----------- crates/items_2/src/eventsdim0enum.rs | 491 ++++++++++++++++++++++++++ crates/items_2/src/eventsdim1.rs | 8 +- crates/items_2/src/items_2.rs | 31 +- crates/items_2/src/test.rs | 5 +- crates/netpod/src/netpod.rs | 22 +- crates/netpod/src/query.rs | 9 +- crates/query/src/api4.rs | 18 +- crates/query/src/api4/binned.rs | 65 ++-- crates/query/src/api4/events.rs | 36 +- crates/streams/src/cbor_stream.rs | 29 ++ crates/streams/src/json_stream.rs | 29 ++ crates/streams/src/plaineventsjson.rs | 6 +- crates/streams/src/timebinnedjson.rs | 5 +- 19 files changed, 674 insertions(+), 386 deletions(-) create mode 100644 crates/items_2/src/eventsdim0enum.rs diff --git a/crates/httpret/src/proxy.rs b/crates/httpret/src/proxy.rs index 00c93cd..bac6b72 100644 --- a/crates/httpret/src/proxy.rs +++ b/crates/httpret/src/proxy.rs @@ -506,7 +506,7 @@ where QT: fmt::Debug + FromUrl + AppendToUrl + HasBackend + HasTimeout, { let url = req_uri_to_url(req.uri())?; - let mut query = match QT::from_url(&url) { + let query = match QT::from_url(&url) { Ok(k) => k, Err(_) => { let msg = format!("malformed request or missing parameters {}", req.uri()); @@ -515,10 +515,7 @@ where } }; trace!("proxy_backend_query {:?} {:?}", query, req.uri()); - let timeout = query.timeout(); - let timeout_next = timeout.saturating_sub(Duration::from_millis(1000)); - trace!("timeout {timeout:?} timeout_next {timeout_next:?}"); - query.set_timeout(timeout_next); + let timeout = Duration::from_millis(1000 * 30); let query = query; let backend = query.backend(); let uri_path = proxy_backend_query_helper_uri_path(req.uri().path(), &url); @@ -598,23 +595,25 @@ pub async fn proxy_backend_query_inner( Ok::<_, Error>(res) }; - let res = tokio::time::timeout(timeout, fut).await.map_err(|_| { - let e = Error::with_msg_no_trace(format!("timeout trying to make sub request")); - warn!("{e}"); - e - })??; - - { - use bytes::Bytes; - use httpclient::http_body::Frame; - use httpclient::BodyError; - let (head, body) = res.into_parts(); - let body = StreamIncoming::new(body); - let body = body.map(|x| x.map(Frame::data)); - let body: Pin, BodyError>> + Send>> = Box::pin(body); - let body = http_body_util::StreamBody::new(body); - let ret = Response::from_parts(head, body); - Ok(ret) + match tokio::time::timeout(timeout, fut).await { + Ok(res) => { + let res = res?; + use bytes::Bytes; + use httpclient::http_body::Frame; + use httpclient::BodyError; + let (head, body) = res.into_parts(); + let body = StreamIncoming::new(body); + let body = body.map(|x| x.map(Frame::data)); + let body: Pin, BodyError>> + Send>> = Box::pin(body); + let body = http_body_util::StreamBody::new(body); + let ret = Response::from_parts(head, body); + Ok(ret) + } + Err(_) => Ok(httpclient::error_status_response( + StatusCode::REQUEST_TIMEOUT, + format!("request timed out at proxy, limit {} ms", timeout.as_millis() as u64), + ctx.reqid(), + )), } } diff --git a/crates/httpret/src/pulsemap.rs b/crates/httpret/src/pulsemap.rs index c3c1f6c..4b02de2 100644 --- a/crates/httpret/src/pulsemap.rs +++ b/crates/httpret/src/pulsemap.rs @@ -843,13 +843,8 @@ impl HasBackend for MapPulseQuery { } impl HasTimeout for MapPulseQuery { - fn timeout(&self) -> Duration { - MAP_PULSE_QUERY_TIMEOUT - } - - fn set_timeout(&mut self, timeout: Duration) { - // TODO - // self.timeout = Some(timeout); + fn timeout(&self) -> Option { + Some(MAP_PULSE_QUERY_TIMEOUT) } } diff --git a/crates/items_0/src/isodate.rs b/crates/items_0/src/isodate.rs index 262c7f3..18abefd 100644 --- a/crates/items_0/src/isodate.rs +++ b/crates/items_0/src/isodate.rs @@ -11,8 +11,17 @@ pub struct IsoDateTime(DateTime); impl IsoDateTime { pub fn from_unix_millis(ms: u64) -> Self { - let datetime = chrono::DateTime::from_timestamp_millis(ms as i64).unwrap(); - Self(datetime) + // let datetime = chrono::DateTime::from_timestamp_millis(ms as i64).unwrap(); + // Self(datetime) + IsoDateTime( + Utc.timestamp_millis_opt(ms as i64) + .earliest() + .unwrap_or(Utc.timestamp_nanos(0)), + ) + } + + pub fn from_ns_u64(ts: u64) -> Self { + IsoDateTime(Utc.timestamp_nanos(ts as i64)) } } diff --git a/crates/items_2/src/binsdim0.rs b/crates/items_2/src/binsdim0.rs index 73024a4..058cba2 100644 --- a/crates/items_2/src/binsdim0.rs +++ b/crates/items_2/src/binsdim0.rs @@ -499,9 +499,9 @@ impl CollectorType for BinsDim0Collector { match vals.ts2s.back() { Some(&k) => { let missing_bins = bin_count_exp - bin_count; - let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64)); + let continue_at = IsoDateTime::from_ns_u64(k); let u = k + (k - vals.ts1s.back().unwrap()) * missing_bins as u64; - let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64)); + let finished_at = IsoDateTime::from_ns_u64(u); (missing_bins, Some(continue_at), Some(finished_at)) } None => { diff --git a/crates/items_2/src/binsxbindim0.rs b/crates/items_2/src/binsxbindim0.rs index cefb254..970c910 100644 --- a/crates/items_2/src/binsxbindim0.rs +++ b/crates/items_2/src/binsxbindim0.rs @@ -473,9 +473,9 @@ impl CollectorType for BinsXbinDim0Collector { match self.vals.ts2s.back() { Some(&k) => { let missing_bins = bin_count_exp - bin_count; - let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64)); + let continue_at = IsoDateTime::from_ns_u64(k); let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64; - let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64)); + let finished_at = IsoDateTime::from_ns_u64(u); (missing_bins, Some(continue_at), Some(finished_at)) } None => { diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index df5115f..8611c0b 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -83,231 +83,6 @@ macro_rules! trace_binning { }; } -#[derive(Debug)] -pub struct EventsDim0EnumCollector { - vals: EventsDim0Enum, - range_final: bool, - timed_out: bool, - needs_continue_at: bool, -} - -impl EventsDim0EnumCollector { - pub fn new() -> Self { - Self { - vals: EventsDim0Enum::new(), - range_final: false, - timed_out: false, - needs_continue_at: false, - } - } -} - -impl TypeName for EventsDim0EnumCollector { - fn type_name(&self) -> String { - "EventsDim0EnumCollector".into() - } -} - -impl WithLen for EventsDim0EnumCollector { - fn len(&self) -> usize { - self.vals.tss.len() - } -} - -impl ByteEstimate for EventsDim0EnumCollector { - fn byte_estimate(&self) -> u64 { - // TODO does it need to be more accurate? - 30 * self.len() as u64 - } -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct EventsDim0EnumCollectorOutput { - #[serde(rename = "tsAnchor")] - ts_anchor_sec: u64, - #[serde(rename = "tsMs")] - ts_off_ms: VecDeque, - #[serde(rename = "tsNs")] - ts_off_ns: VecDeque, - #[serde(rename = "values")] - vals: VecDeque, - #[serde(rename = "valuestrings")] - valstrs: VecDeque, - #[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")] - range_final: bool, - #[serde(rename = "timedOut", default, skip_serializing_if = "is_false")] - timed_out: bool, - #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] - continue_at: Option, -} - -impl WithLen for EventsDim0EnumCollectorOutput { - fn len(&self) -> usize { - todo!() - } -} - -impl AsAnyRef for EventsDim0EnumCollectorOutput { - fn as_any_ref(&self) -> &dyn Any { - todo!() - } -} - -impl AsAnyMut for EventsDim0EnumCollectorOutput { - fn as_any_mut(&mut self) -> &mut dyn Any { - todo!() - } -} - -impl ToJsonResult for EventsDim0EnumCollectorOutput { - fn to_json_result(&self) -> Result, Error> { - todo!() - } -} - -impl Collected for EventsDim0EnumCollectorOutput {} - -impl CollectorType for EventsDim0EnumCollector { - type Input = EventsDim0Enum; - type Output = EventsDim0EnumCollectorOutput; - - fn ingest(&mut self, src: &mut EventsDim0Enum) { - self.vals.tss.append(&mut src.tss); - self.vals.values.append(&mut src.values); - self.vals.valuestrs.append(&mut src.valuestrs); - } - - fn set_range_complete(&mut self) { - self.range_final = true; - } - - fn set_timed_out(&mut self) { - self.timed_out = true; - self.needs_continue_at = true; - } - - fn set_continue_at_here(&mut self) { - self.needs_continue_at = true; - } - - fn result( - &mut self, - range: Option, - binrange: Option, - ) -> Result { - debug!( - "{} result() needs_continue_at {}", - self.type_name(), - self.needs_continue_at - ); - // If we timed out, we want to hint the client from where to continue. - // This is tricky: currently, client can not request a left-exclusive range. - // We currently give the timestamp of the last event plus a small delta. - // The amount of the delta must take into account what kind of timestamp precision the client - // can parse and handle. - let vals = &mut self.vals; - let continue_at = if self.needs_continue_at { - if let Some(ts) = vals.tss.back() { - let x = Some(IsoDateTime::from_u64(*ts / MS * MS + MS)); - x - } else { - if let Some(range) = &range { - match range { - SeriesRange::TimeRange(x) => Some(IsoDateTime::from_u64(x.beg + SEC)), - SeriesRange::PulseRange(_) => { - error!("TODO emit create continueAt for pulse range"); - Some(IsoDateTime::from_u64(0)) - } - } - } else { - Some(IsoDateTime::from_u64(0)) - } - } - } else { - None - }; - let tss_sl = vals.tss.make_contiguous(); - let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::ts_offs_from_abs(tss_sl); - let valixs = mem::replace(&mut vals.values, VecDeque::new()); - let valstrs = mem::replace(&mut vals.valuestrs, VecDeque::new()); - let vals = valixs; - if ts_off_ms.len() != ts_off_ns.len() { - return Err(Error::with_msg_no_trace("collected len mismatch")); - } - if ts_off_ms.len() != vals.len() { - return Err(Error::with_msg_no_trace("collected len mismatch")); - } - if ts_off_ms.len() != valstrs.len() { - return Err(Error::with_msg_no_trace("collected len mismatch")); - } - let ret = Self::Output { - ts_anchor_sec, - ts_off_ms, - ts_off_ns, - vals, - valstrs, - range_final: self.range_final, - timed_out: self.timed_out, - continue_at, - }; - Ok(ret) - } -} - -// Experiment with having this special case for enums -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct EventsDim0Enum { - pub tss: VecDeque, - pub values: VecDeque, - pub valuestrs: VecDeque, -} - -impl EventsDim0Enum { - pub fn new() -> Self { - Self { - tss: VecDeque::new(), - values: VecDeque::new(), - valuestrs: VecDeque::new(), - } - } - - pub fn push_back(&mut self, ts: u64, value: u16, valuestr: String) { - self.tss.push_back(ts); - self.values.push_back(value); - self.valuestrs.push_back(valuestr); - } -} - -impl TypeName for EventsDim0Enum { - fn type_name(&self) -> String { - "EventsDim0Enum".into() - } -} - -impl AsAnyRef for EventsDim0Enum { - fn as_any_ref(&self) -> &dyn Any { - self - } -} - -impl AsAnyMut for EventsDim0Enum { - fn as_any_mut(&mut self) -> &mut dyn Any { - self - } -} - -impl WithLen for EventsDim0Enum { - fn len(&self) -> usize { - self.tss.len() - } -} - -impl Collectable for EventsDim0Enum { - fn new_collector(&self) -> Box { - Box::new(EventsDim0EnumCollector::new()) - } -} - #[derive(Clone, PartialEq, Serialize, Deserialize)] pub struct EventsDim0NoPulse { pub tss: VecDeque, @@ -701,19 +476,19 @@ impl CollectorType for EventsDim0Collector { let vals = &mut self.vals; let continue_at = if self.needs_continue_at { if let Some(ts) = vals.tss.back() { - let x = Some(IsoDateTime::from_u64(*ts / MS * MS + MS)); + let x = Some(IsoDateTime::from_ns_u64(*ts / MS * MS + MS)); x } else { if let Some(range) = &range { match range { - SeriesRange::TimeRange(x) => Some(IsoDateTime::from_u64(x.beg + SEC)), + SeriesRange::TimeRange(x) => Some(IsoDateTime::from_ns_u64(x.beg + SEC)), SeriesRange::PulseRange(_) => { error!("TODO emit create continueAt for pulse range"); - Some(IsoDateTime::from_u64(0)) + Some(IsoDateTime::from_ns_u64(0)) } } } else { - Some(IsoDateTime::from_u64(0)) + Some(IsoDateTime::from_ns_u64(0)) } } } else { diff --git a/crates/items_2/src/eventsdim0enum.rs b/crates/items_2/src/eventsdim0enum.rs new file mode 100644 index 0000000..1442afa --- /dev/null +++ b/crates/items_2/src/eventsdim0enum.rs @@ -0,0 +1,491 @@ +use err::Error; +use items_0::collect_s::Collectable; +use items_0::collect_s::Collected; +use items_0::collect_s::Collector; +use items_0::collect_s::CollectorType; +use items_0::collect_s::ToJsonBytes; +use items_0::collect_s::ToJsonResult; +use items_0::container::ByteEstimate; +use items_0::isodate::IsoDateTime; +use items_0::overlap::RangeOverlapInfo; +use items_0::scalar_ops::ScalarOps; +use items_0::timebin::TimeBinnable; +use items_0::timebin::TimeBinnableTy; +use items_0::timebin::TimeBinnerTy; +use items_0::AsAnyMut; +use items_0::AsAnyRef; +use items_0::Events; +use items_0::EventsNonObj; +use items_0::TypeName; +use items_0::WithLen; +use netpod::log::*; +use netpod::range::evrange::SeriesRange; +use netpod::timeunits::MS; +use netpod::timeunits::SEC; +use netpod::BinnedRangeEnum; +use serde::Deserialize; +use serde::Serialize; +use std::any::Any; +use std::collections::VecDeque; +use std::mem; + +#[allow(unused)] +macro_rules! trace_collect_result { + ($($arg:tt)*) => { + if false { + trace!($($arg)*); + } + }; +} + +#[derive(Debug)] +pub struct EventsDim0EnumCollector { + vals: EventsDim0Enum, + range_final: bool, + timed_out: bool, + needs_continue_at: bool, +} + +impl EventsDim0EnumCollector { + pub fn new() -> Self { + Self { + vals: EventsDim0Enum::new(), + range_final: false, + timed_out: false, + needs_continue_at: false, + } + } +} + +impl TypeName for EventsDim0EnumCollector { + fn type_name(&self) -> String { + "EventsDim0EnumCollector".into() + } +} + +impl WithLen for EventsDim0EnumCollector { + fn len(&self) -> usize { + self.vals.tss.len() + } +} + +impl ByteEstimate for EventsDim0EnumCollector { + fn byte_estimate(&self) -> u64 { + // TODO does it need to be more accurate? + 30 * self.len() as u64 + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct EventsDim0EnumCollectorOutput { + #[serde(rename = "tsAnchor")] + ts_anchor_sec: u64, + #[serde(rename = "tsMs")] + ts_off_ms: VecDeque, + #[serde(rename = "tsNs")] + ts_off_ns: VecDeque, + #[serde(rename = "values")] + vals: VecDeque, + #[serde(rename = "valuestrings")] + valstrs: VecDeque, + #[serde(rename = "rangeFinal", default, skip_serializing_if = "netpod::is_false")] + range_final: bool, + #[serde(rename = "timedOut", default, skip_serializing_if = "netpod::is_false")] + timed_out: bool, + #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] + continue_at: Option, +} + +impl WithLen for EventsDim0EnumCollectorOutput { + fn len(&self) -> usize { + todo!() + } +} + +impl AsAnyRef for EventsDim0EnumCollectorOutput { + fn as_any_ref(&self) -> &dyn Any { + todo!() + } +} + +impl AsAnyMut for EventsDim0EnumCollectorOutput { + fn as_any_mut(&mut self) -> &mut dyn Any { + todo!() + } +} + +impl ToJsonResult for EventsDim0EnumCollectorOutput { + fn to_json_result(&self) -> Result, Error> { + todo!() + } +} + +impl Collected for EventsDim0EnumCollectorOutput {} + +impl CollectorType for EventsDim0EnumCollector { + type Input = EventsDim0Enum; + type Output = EventsDim0EnumCollectorOutput; + + fn ingest(&mut self, src: &mut EventsDim0Enum) { + self.vals.tss.append(&mut src.tss); + self.vals.values.append(&mut src.values); + self.vals.valuestrs.append(&mut src.valuestrs); + } + + fn set_range_complete(&mut self) { + self.range_final = true; + } + + fn set_timed_out(&mut self) { + self.timed_out = true; + self.needs_continue_at = true; + } + + fn set_continue_at_here(&mut self) { + self.needs_continue_at = true; + } + + fn result( + &mut self, + range: Option, + binrange: Option, + ) -> Result { + trace_collect_result!( + "{} result() needs_continue_at {}", + self.type_name(), + self.needs_continue_at + ); + // If we timed out, we want to hint the client from where to continue. + // This is tricky: currently, client can not request a left-exclusive range. + // We currently give the timestamp of the last event plus a small delta. + // The amount of the delta must take into account what kind of timestamp precision the client + // can parse and handle. + let vals = &mut self.vals; + let continue_at = if self.needs_continue_at { + if let Some(ts) = vals.tss.back() { + let x = Some(IsoDateTime::from_ns_u64(*ts / MS * MS + MS)); + x + } else { + if let Some(range) = &range { + match range { + SeriesRange::TimeRange(x) => Some(IsoDateTime::from_ns_u64(x.beg + SEC)), + SeriesRange::PulseRange(_) => { + error!("TODO emit create continueAt for pulse range"); + Some(IsoDateTime::from_ns_u64(0)) + } + } + } else { + Some(IsoDateTime::from_ns_u64(0)) + } + } + } else { + None + }; + let tss_sl = vals.tss.make_contiguous(); + let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::ts_offs_from_abs(tss_sl); + let valixs = mem::replace(&mut vals.values, VecDeque::new()); + let valstrs = mem::replace(&mut vals.valuestrs, VecDeque::new()); + let vals = valixs; + if ts_off_ms.len() != ts_off_ns.len() { + return Err(Error::with_msg_no_trace("collected len mismatch")); + } + if ts_off_ms.len() != vals.len() { + return Err(Error::with_msg_no_trace("collected len mismatch")); + } + if ts_off_ms.len() != valstrs.len() { + return Err(Error::with_msg_no_trace("collected len mismatch")); + } + let ret = Self::Output { + ts_anchor_sec, + ts_off_ms, + ts_off_ns, + vals, + valstrs, + range_final: self.range_final, + timed_out: self.timed_out, + continue_at, + }; + Ok(ret) + } +} + +// Experiment with having this special case for enums +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct EventsDim0Enum { + pub tss: VecDeque, + pub values: VecDeque, + pub valuestrs: VecDeque, +} + +impl EventsDim0Enum { + pub fn new() -> Self { + Self { + tss: VecDeque::new(), + values: VecDeque::new(), + valuestrs: VecDeque::new(), + } + } + + pub fn push_back(&mut self, ts: u64, value: u16, valuestr: String) { + self.tss.push_back(ts); + self.values.push_back(value); + self.valuestrs.push_back(valuestr); + } +} + +impl TypeName for EventsDim0Enum { + fn type_name(&self) -> String { + "EventsDim0Enum".into() + } +} + +impl AsAnyRef for EventsDim0Enum { + fn as_any_ref(&self) -> &dyn Any { + self + } +} + +impl AsAnyMut for EventsDim0Enum { + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} + +impl WithLen for EventsDim0Enum { + fn len(&self) -> usize { + self.tss.len() + } +} + +impl Collectable for EventsDim0Enum { + fn new_collector(&self) -> Box { + Box::new(EventsDim0EnumCollector::new()) + } +} + +// impl Events + +impl ByteEstimate for EventsDim0Enum { + fn byte_estimate(&self) -> u64 { + todo!() + } +} + +impl EventsNonObj for EventsDim0Enum { + fn into_tss_pulses(self: Box) -> (VecDeque, VecDeque) { + todo!() + } +} + +impl RangeOverlapInfo for EventsDim0Enum { + fn ends_before(&self, range: &SeriesRange) -> bool { + todo!() + } + + fn ends_after(&self, range: &SeriesRange) -> bool { + todo!() + } + + fn starts_after(&self, range: &SeriesRange) -> bool { + todo!() + } +} + +// NOTE just a dummy because currently we don't use this for time binning +#[derive(Debug)] +pub struct EventsDim0EnumTimeBinner; + +impl TimeBinnerTy for EventsDim0EnumTimeBinner { + type Input = EventsDim0Enum; + type Output = (); + + fn ingest(&mut self, item: &mut Self::Input) { + todo!() + } + + fn set_range_complete(&mut self) { + todo!() + } + + fn bins_ready_count(&self) -> usize { + todo!() + } + + fn bins_ready(&mut self) -> Option { + todo!() + } + + fn push_in_progress(&mut self, push_empty: bool) { + todo!() + } + + fn cycle(&mut self) { + todo!() + } + + fn empty(&self) -> Option { + todo!() + } + + fn append_empty_until_end(&mut self) { + todo!() + } +} + +// NOTE just a dummy because currently we don't use this for time binning +impl TimeBinnableTy for EventsDim0Enum { + type TimeBinner = EventsDim0EnumTimeBinner; + + fn time_binner_new( + &self, + binrange: BinnedRangeEnum, + do_time_weight: bool, + emit_empty_bins: bool, + ) -> Self::TimeBinner { + todo!() + } +} + +// NOTE just a dummy because currently we don't use this for time binning +impl TimeBinnable for EventsDim0Enum { + fn time_binner_new( + &self, + binrange: BinnedRangeEnum, + do_time_weight: bool, + emit_empty_bins: bool, + ) -> Box { + todo!() + } + + fn to_box_to_json_result(&self) -> Box { + todo!() + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct EventsDim0EnumChunkOutput { + tss: VecDeque, + values: VecDeque, + valuestrings: VecDeque, + scalar_type: String, +} + +impl Events for EventsDim0Enum { + fn as_time_binnable_ref(&self) -> &dyn items_0::timebin::TimeBinnable { + todo!() + } + + fn as_time_binnable_mut(&mut self) -> &mut dyn items_0::timebin::TimeBinnable { + todo!() + } + + fn verify(&self) -> bool { + todo!() + } + + fn output_info(&self) -> String { + todo!() + } + + fn as_collectable_mut(&mut self) -> &mut dyn Collectable { + todo!() + } + + fn as_collectable_with_default_ref(&self) -> &dyn Collectable { + todo!() + } + + fn as_collectable_with_default_mut(&mut self) -> &mut dyn Collectable { + todo!() + } + + fn ts_min(&self) -> Option { + todo!() + } + + fn ts_max(&self) -> Option { + todo!() + } + + fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box { + todo!() + } + + fn new_empty_evs(&self) -> Box { + todo!() + } + + fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), items_0::MergeError> { + todo!() + } + + fn find_lowest_index_gt_evs(&self, ts: u64) -> Option { + todo!() + } + + fn find_lowest_index_ge_evs(&self, ts: u64) -> Option { + todo!() + } + + fn find_highest_index_lt_evs(&self, ts: u64) -> Option { + todo!() + } + + fn clone_dyn(&self) -> Box { + todo!() + } + + fn partial_eq_dyn(&self, other: &dyn Events) -> bool { + todo!() + } + + fn serde_id(&self) -> &'static str { + todo!() + } + + fn nty_id(&self) -> u32 { + todo!() + } + + fn tss(&self) -> &VecDeque { + todo!() + } + + fn pulses(&self) -> &VecDeque { + todo!() + } + + fn frame_type_id(&self) -> u32 { + todo!() + } + + fn to_min_max_avg(&mut self) -> Box { + todo!() + } + + fn to_json_string(&self) -> String { + todo!() + } + + fn to_json_vec_u8(&self) -> Vec { + self.to_json_string().into_bytes() + } + + fn to_cbor_vec_u8(&self) -> Vec { + // TODO redesign with mut access, rename to `into_` and take the values out. + let ret = EventsDim0EnumChunkOutput { + // TODO use &mut to swap the content + tss: self.tss.clone(), + values: self.values.clone(), + valuestrings: self.valuestrs.clone(), + scalar_type: netpod::EnumVariant::scalar_type_name().into(), + }; + let mut buf = Vec::new(); + ciborium::into_writer(&ret, &mut buf).unwrap(); + buf + } + + fn clear(&mut self) { + todo!() + } +} diff --git a/crates/items_2/src/eventsdim1.rs b/crates/items_2/src/eventsdim1.rs index cfabe8b..86cf0cc 100644 --- a/crates/items_2/src/eventsdim1.rs +++ b/crates/items_2/src/eventsdim1.rs @@ -400,19 +400,19 @@ impl CollectorType for EventsDim1Collector { let vals = &mut self.vals; let continue_at = if self.timed_out { if let Some(ts) = vals.tss.back() { - Some(IsoDateTime::from_u64(*ts + MS)) + Some(IsoDateTime::from_ns_u64(*ts + MS)) } else { if let Some(range) = &range { match range { - SeriesRange::TimeRange(x) => Some(IsoDateTime::from_u64(x.beg + SEC)), + SeriesRange::TimeRange(x) => Some(IsoDateTime::from_ns_u64(x.beg + SEC)), SeriesRange::PulseRange(x) => { error!("TODO emit create continueAt for pulse range"); - Some(IsoDateTime::from_u64(0)) + Some(IsoDateTime::from_ns_u64(0)) } } } else { warn!("can not determine continue-at parameters"); - Some(IsoDateTime::from_u64(0)) + Some(IsoDateTime::from_ns_u64(0)) } } } else { diff --git a/crates/items_2/src/items_2.rs b/crates/items_2/src/items_2.rs index 5157262..6fa1566 100644 --- a/crates/items_2/src/items_2.rs +++ b/crates/items_2/src/items_2.rs @@ -6,6 +6,7 @@ pub mod channelevents; pub mod empty; pub mod eventfull; pub mod eventsdim0; +pub mod eventsdim0enum; pub mod eventsdim1; pub mod eventsxbindim0; pub mod framable; @@ -20,10 +21,8 @@ pub mod timebin; pub mod transform; use channelevents::ChannelEvents; -use chrono::DateTime; -use chrono::TimeZone; -use chrono::Utc; use futures_util::Stream; +use items_0::isodate::IsoDateTime; use items_0::overlap::RangeOverlapInfo; use items_0::streamitem::Sitemty; use items_0::transform::EventTransform; @@ -33,10 +32,6 @@ use items_0::MergeError; use merger::Mergeable; use netpod::range::evrange::SeriesRange; use netpod::timeunits::*; -use netpod::DATETIME_FMT_3MS; -use serde::Deserialize; -use serde::Serialize; -use serde::Serializer; use std::collections::VecDeque; use std::fmt; @@ -133,28 +128,8 @@ impl serde::de::Error for Error { } } -#[derive(Clone, Debug, PartialEq, Deserialize)] -pub struct IsoDateTime(DateTime); - -impl IsoDateTime { - pub fn from_u64(ts: u64) -> Self { - IsoDateTime(Utc.timestamp_nanos(ts as i64)) - } -} - -impl Serialize for IsoDateTime { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - serializer.serialize_str(&self.0.format(DATETIME_FMT_3MS).to_string()) - } -} - pub fn make_iso_ts(tss: &[u64]) -> Vec { - tss.iter() - .map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64))) - .collect() + tss.iter().map(|&k| IsoDateTime::from_ns_u64(k)).collect() } impl Mergeable for Box { diff --git a/crates/items_2/src/test.rs b/crates/items_2/src/test.rs index 0d032e6..fcfd846 100644 --- a/crates/items_2/src/test.rs +++ b/crates/items_2/src/test.rs @@ -503,10 +503,7 @@ fn binned_timeout_00() { assert_eq!(r2.mins(), &[3.0, 2.0, 3.0]); assert_eq!(r2.maxs(), &[3.2, 2.2, 3.2]); assert_eq!(r2.missing_bins(), 6); - assert_eq!( - r2.continue_at(), - Some(IsoDateTime(Utc.timestamp_nanos((TSBASE + SEC * 4) as i64))) - ); + assert_eq!(r2.continue_at(), Some(IsoDateTime::from_ns_u64(TSBASE + SEC * 4))); Ok::<_, Error>(()) }; runfut(fut).unwrap(); diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 5574062..a09ab4c 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -3213,9 +3213,9 @@ pub trait HasBackend { fn backend(&self) -> &str; } +// TODO change into Option, why do I need to set a timeout using this trait? pub trait HasTimeout { - fn timeout(&self) -> Duration; - fn set_timeout(&mut self, timeout: Duration); + fn timeout(&self) -> Option; } pub trait FromUrl: Sized { @@ -3257,18 +3257,13 @@ impl HasBackend for MapQuery { } impl HasTimeout for MapQuery { - fn timeout(&self) -> Duration { + fn timeout(&self) -> Option { let x: Option = if let Some(v) = self.get("timeout") { v.parse::().ok() } else { None }; - let x = x.unwrap_or(5000); - Duration::from_millis(x as _) - } - - fn set_timeout(&mut self, timeout: Duration) { - self.insert("timeout".into(), format!("{:.0}", 1e3 * timeout.as_secs_f32())); + x.map(|x| Duration::from_millis(x as _)) } } @@ -3294,13 +3289,8 @@ impl HasBackend for ChannelConfigQuery { } impl HasTimeout for ChannelConfigQuery { - fn timeout(&self) -> Duration { - Duration::from_millis(10000) - } - - fn set_timeout(&mut self, _timeout: Duration) { - // TODO - // self.timeout = Some(timeout); + fn timeout(&self) -> Option { + None } } diff --git a/crates/netpod/src/query.rs b/crates/netpod/src/query.rs index 74695eb..d629ac5 100644 --- a/crates/netpod/src/query.rs +++ b/crates/netpod/src/query.rs @@ -297,13 +297,8 @@ impl HasBackend for ChannelStateEventsQuery { } impl HasTimeout for ChannelStateEventsQuery { - fn timeout(&self) -> Duration { - Duration::from_millis(10000) - } - - fn set_timeout(&mut self, timeout: Duration) { - // TODO - // self.timeout = Some(timeout); + fn timeout(&self) -> Option { + None } } diff --git a/crates/query/src/api4.rs b/crates/query/src/api4.rs index ed67ec1..91c34d7 100644 --- a/crates/query/src/api4.rs +++ b/crates/query/src/api4.rs @@ -41,13 +41,8 @@ impl HasBackend for AccountingIngestedBytesQuery { } impl HasTimeout for AccountingIngestedBytesQuery { - fn timeout(&self) -> Duration { - Duration::from_millis(10000) - } - - fn set_timeout(&mut self, timeout: Duration) { - // TODO - // self.timeout = Some(timeout); + fn timeout(&self) -> Option { + None } } @@ -115,13 +110,8 @@ impl HasBackend for AccountingToplistQuery { } impl HasTimeout for AccountingToplistQuery { - fn timeout(&self) -> Duration { - Duration::from_millis(10000) - } - - fn set_timeout(&mut self, timeout: Duration) { - // TODO - // self.timeout = Some(timeout); + fn timeout(&self) -> Option { + None } } diff --git a/crates/query/src/api4/binned.rs b/crates/query/src/api4/binned.rs index 66fc8e6..e5fce77 100644 --- a/crates/query/src/api4/binned.rs +++ b/crates/query/src/api4/binned.rs @@ -22,6 +22,8 @@ pub struct BinnedQuery { channel: SfDbChannel, range: SeriesRange, bin_count: u32, + #[serde(default, skip_serializing_if = "Option::is_none", with = "humantime_serde")] + bin_width: Option, #[serde( default = "TransformQuery::default_time_binned", skip_serializing_if = "TransformQuery::is_default_time_binned" @@ -31,8 +33,13 @@ pub struct BinnedQuery { cache_usage: Option, #[serde(default, skip_serializing_if = "Option::is_none")] bins_max: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - timeout: Option, + #[serde( + default, + skip_serializing_if = "Option::is_none", + with = "humantime_serde", + rename = "contentTimeout" + )] + timeout_content: Option, #[serde(default, skip_serializing_if = "Option::is_none")] buf_len_disk_io: Option, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -53,12 +60,13 @@ impl BinnedQuery { channel, range, bin_count, + bin_width: None, transform: TransformQuery::default_time_binned(), cache_usage: None, bins_max: None, buf_len_disk_io: None, disk_stats_every: None, - timeout: None, + timeout_content: None, merger_out_len_max: None, test_do_wasm: None, log_level: String::new(), @@ -100,19 +108,12 @@ impl BinnedQuery { } } - pub fn timeout(&self) -> Option { - self.timeout.clone() - } - - pub fn timeout_value(&self) -> Duration { - match &self.timeout { - Some(x) => x.clone(), - None => Duration::from_millis(6000), - } + pub fn timeout_content(&self) -> Option { + self.timeout_content } pub fn bins_max(&self) -> u32 { - self.bins_max.unwrap_or(2000) + self.bins_max.unwrap_or(200000) } pub fn merger_out_len_max(&self) -> usize { @@ -135,8 +136,9 @@ impl BinnedQuery { self.disk_stats_every = Some(k); } + // Currently only for testing pub fn set_timeout(&mut self, k: Duration) { - self.timeout = Some(k); + self.timeout_content = Some(k); } pub fn set_buf_len_disk_io(&mut self, k: usize) { @@ -172,12 +174,8 @@ impl HasBackend for BinnedQuery { } impl HasTimeout for BinnedQuery { - fn timeout(&self) -> Duration { - self.timeout_value() - } - - fn set_timeout(&mut self, timeout: Duration) { - self.timeout = Some(timeout); + fn timeout(&self) -> Option { + self.timeout_content } } @@ -191,7 +189,8 @@ impl FromUrl for BinnedQuery { let ret = Self { channel: SfDbChannel::from_pairs(&pairs)?, range: SeriesRange::from_pairs(pairs)?, - bin_count: pairs.get("binCount").map_or(None, |x| x.parse().ok()).unwrap_or(10), + bin_count: pairs.get("binCount").and_then(|x| x.parse().ok()).unwrap_or(10), + bin_width: pairs.get("binWidth").and_then(|x| humantime::parse_duration(x).ok()), transform: TransformQuery::from_pairs(pairs)?, cache_usage: CacheUsage::from_pairs(&pairs)?, buf_len_disk_io: pairs @@ -207,10 +206,9 @@ impl FromUrl for BinnedQuery { .map_or("false", |k| k) .parse() .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?,*/ - timeout: pairs - .get("timeout") - .map(|x| x.parse::().map(Duration::from_millis).ok()) - .unwrap_or(None), + timeout_content: pairs + .get("contentTimeout") + .and_then(|x| humantime::parse_duration(x).ok()), bins_max: pairs.get("binsMax").map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, merger_out_len_max: pairs .get("mergerOutLenMax") @@ -235,14 +233,27 @@ impl AppendToUrl for BinnedQuery { { let mut g = url.query_pairs_mut(); g.append_pair("binCount", &format!("{}", self.bin_count)); + if let Some(x) = self.bin_width { + if x < Duration::from_secs(1) { + g.append_pair("binWidth", &format!("{:.0}ms", x.subsec_millis())); + } else if x < Duration::from_secs(60) { + g.append_pair("binWidth", &format!("{:.0}s", x.as_secs_f64())); + } else if x < Duration::from_secs(60 * 60) { + g.append_pair("binWidth", &format!("{:.0}m", x.as_secs() / 60)); + } else if x < Duration::from_secs(60 * 60 * 24) { + g.append_pair("binWidth", &format!("{:.0}h", x.as_secs() / 60 / 60)); + } else { + g.append_pair("binWidth", &format!("{:.0}d", x.as_secs() / 60 / 60 / 24)); + } + } } self.transform.append_to_url(url); let mut g = url.query_pairs_mut(); if let Some(x) = &self.cache_usage { g.append_pair("cacheUsage", &x.query_param_value()); } - if let Some(x) = &self.timeout { - g.append_pair("timeout", &format!("{}", x.as_millis())); + if let Some(x) = &self.timeout_content { + g.append_pair("contentTimeout", &format!("{:.0}ms", 1e3 * x.as_secs_f64())); } if let Some(x) = self.bins_max { g.append_pair("binsMax", &format!("{}", x)); diff --git a/crates/query/src/api4/events.rs b/crates/query/src/api4/events.rs index 8b78155..efeb5f5 100644 --- a/crates/query/src/api4/events.rs +++ b/crates/query/src/api4/events.rs @@ -31,8 +31,13 @@ pub struct PlainEventsQuery { #[serde(default = "TransformQuery::default_events")] #[serde(skip_serializing_if = "TransformQuery::is_default_events")] transform: TransformQuery, - #[serde(default, skip_serializing_if = "Option::is_none", with = "humantime_serde")] - timeout: Option, + #[serde( + default, + skip_serializing_if = "Option::is_none", + with = "humantime_serde", + rename = "contentTimeout" + )] + timeout_content: Option, #[serde(default, skip_serializing_if = "Option::is_none")] events_max: Option, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -72,7 +77,7 @@ impl PlainEventsQuery { range: range.into(), one_before_range: false, transform: TransformQuery::default_events(), - timeout: Some(Duration::from_millis(4000)), + timeout_content: None, events_max: None, bytes_max: None, allow_large_result: None, @@ -110,8 +115,8 @@ impl PlainEventsQuery { self.buf_len_disk_io.unwrap_or(1024 * 8) } - pub fn timeout(&self) -> Duration { - self.timeout.unwrap_or(Duration::from_millis(10000)) + pub fn timeout_content(&self) -> Option { + self.timeout_content } pub fn events_max(&self) -> u64 { @@ -225,12 +230,8 @@ impl HasBackend for PlainEventsQuery { } impl HasTimeout for PlainEventsQuery { - fn timeout(&self) -> Duration { - self.timeout() - } - - fn set_timeout(&mut self, timeout: Duration) { - self.timeout = Some(timeout); + fn timeout(&self) -> Option { + PlainEventsQuery::timeout_content(self) } } @@ -253,10 +254,9 @@ impl FromUrl for PlainEventsQuery { range, one_before_range: pairs.get("oneBeforeRange").map_or("false", |x| x.as_ref()) == "true", transform: TransformQuery::from_pairs(pairs)?, - timeout: pairs - .get("timeout") - .map(|x| x.parse::().map(Duration::from_millis).ok()) - .unwrap_or(None), + timeout_content: pairs + .get("contentTimeout") + .and_then(|x| humantime::parse_duration(x).ok()), events_max: pairs.get("eventsMax").map_or(None, |k| k.parse().ok()), bytes_max: pairs.get("bytesMax").map_or(None, |k| k.parse().ok()), allow_large_result: pairs.get("allowLargeResult").map_or(None, |x| x.parse().ok()), @@ -317,8 +317,8 @@ impl AppendToUrl for PlainEventsQuery { drop(g); self.transform.append_to_url(url); let mut g = url.query_pairs_mut(); - if let Some(x) = &self.timeout { - g.append_pair("timeout", &format!("{:.0}", x.as_secs_f64() * 1e3)); + if let Some(x) = &self.timeout_content { + g.append_pair("contentTimeout", &format!("{:.0}ms", 1e3 * x.as_secs_f64())); } if let Some(x) = self.events_max.as_ref() { g.append_pair("eventsMax", &x.to_string()); @@ -431,7 +431,7 @@ impl Default for EventsSubQuerySettings { impl From<&PlainEventsQuery> for EventsSubQuerySettings { fn from(value: &PlainEventsQuery) -> Self { Self { - timeout: value.timeout, + timeout: value.timeout_content(), events_max: value.events_max, bytes_max: value.bytes_max, event_delay: value.event_delay, diff --git a/crates/streams/src/cbor_stream.rs b/crates/streams/src/cbor_stream.rs index 03b0aa2..19061b3 100644 --- a/crates/streams/src/cbor_stream.rs +++ b/crates/streams/src/cbor_stream.rs @@ -103,6 +103,35 @@ fn map_events(x: Result>>, Error // Ok(StreamItem::Log(item)) }; } + let mut k = evs; + let evs = if let Some(j) = k.as_any_mut().downcast_mut::() { + use items_0::AsAnyMut; + match j { + items_2::channelevents::ChannelEvents::Events(m) => { + if let Some(g) = m + .as_any_mut() + .downcast_mut::>() + { + trace!("consider container EnumVariant"); + let mut out = items_2::eventsdim0enum::EventsDim0Enum::new(); + for (&ts, val) in g.tss.iter().zip(g.values.iter()) { + out.push_back(ts, val.ix(), val.name_string()); + } + Box::new(items_2::channelevents::ChannelEvents::Events(Box::new(out))) + } else { + trace!("consider container channel events other events {}", k.type_name()); + k + } + } + items_2::channelevents::ChannelEvents::Status(_) => { + trace!("consider container channel events status {}", k.type_name()); + k + } + } + } else { + trace!("consider container else {}", k.type_name()); + k + }; let buf = evs.to_cbor_vec_u8(); let bytes = Bytes::from(buf); let item = CborBytes(bytes); diff --git a/crates/streams/src/json_stream.rs b/crates/streams/src/json_stream.rs index 72bcff3..3637e2c 100644 --- a/crates/streams/src/json_stream.rs +++ b/crates/streams/src/json_stream.rs @@ -58,6 +58,35 @@ fn map_events(x: Result>>, Error Ok(x) => match x { StreamItem::DataItem(x) => match x { RangeCompletableItem::Data(evs) => { + let mut k = evs; + let evs = if let Some(j) = k.as_any_mut().downcast_mut::() { + use items_0::AsAnyMut; + match j { + items_2::channelevents::ChannelEvents::Events(m) => { + if let Some(g) = m + .as_any_mut() + .downcast_mut::>() + { + trace!("consider container EnumVariant"); + let mut out = items_2::eventsdim0enum::EventsDim0Enum::new(); + for (&ts, val) in g.tss.iter().zip(g.values.iter()) { + out.push_back(ts, val.ix(), val.name_string()); + } + Box::new(items_2::channelevents::ChannelEvents::Events(Box::new(out))) + } else { + trace!("consider container channel events other events {}", k.type_name()); + k + } + } + items_2::channelevents::ChannelEvents::Status(_) => { + trace!("consider container channel events status {}", k.type_name()); + k + } + } + } else { + trace!("consider container else {}", k.type_name()); + k + }; let s = evs.to_json_string(); let item = JsonBytes::new(s); Ok(item) diff --git a/crates/streams/src/plaineventsjson.rs b/crates/streams/src/plaineventsjson.rs index ffd42ef..910033f 100644 --- a/crates/streams/src/plaineventsjson.rs +++ b/crates/streams/src/plaineventsjson.rs @@ -13,9 +13,11 @@ use items_0::on_sitemty_data; use netpod::log::*; use netpod::ChannelTypeConfigGen; use netpod::Cluster; +use netpod::HasTimeout; use netpod::ReqCtx; use query::api4::events::PlainEventsQuery; use serde_json::Value as JsonValue; +use std::time::Duration; use std::time::Instant; #[derive(Debug, ThisError)] @@ -34,7 +36,7 @@ pub async fn plain_events_json( open_bytes: OpenBoxedBytesStreamsBox, ) -> Result { debug!("plain_events_json evquery {:?}", evq); - let deadline = Instant::now() + evq.timeout(); + let deadline = Instant::now() + evq.timeout().unwrap_or(Duration::from_millis(4000)); let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?; @@ -49,7 +51,7 @@ pub async fn plain_events_json( .downcast_mut::>() { trace!("consider container EnumVariant"); - let mut out = items_2::eventsdim0::EventsDim0Enum::new(); + let mut out = items_2::eventsdim0enum::EventsDim0Enum::new(); for (&ts, val) in g.tss.iter().zip(g.values.iter()) { out.push_back(ts, val.ix(), val.name_string()); } diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index 2829272..1ba9e20 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -28,6 +28,7 @@ use netpod::ReqCtx; use query::api4::binned::BinnedQuery; use serde_json::Value as JsonValue; use std::pin::Pin; +use std::time::Duration; use std::time::Instant; #[allow(unused)] @@ -78,7 +79,7 @@ async fn timebinnable_stream( }) }); - #[cfg(DISABLED)] + #[cfg(target_abi = "")] #[cfg(wasm_transform)] let stream = if let Some(wasmname) = wasm1 { debug!("make wasm transform"); @@ -257,7 +258,7 @@ pub async fn timebinned_json( ctx: &ReqCtx, open_bytes: OpenBoxedBytesStreamsBox, ) -> Result { - let deadline = Instant::now().checked_add(query.timeout_value()).unwrap(); + let deadline = Instant::now() + query.timeout_content().unwrap_or(Duration::from_millis(5000)); let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?; // TODO derive better values, from query let collect_max = 10000;