Deliver enum data in a better formatted way

This commit is contained in:
Dominik Werder
2024-09-04 16:32:30 +02:00
parent 55b3bf4acd
commit de4569d686
19 changed files with 674 additions and 386 deletions
+21 -22
View File
@@ -506,7 +506,7 @@ where
QT: fmt::Debug + FromUrl + AppendToUrl + HasBackend + HasTimeout, QT: fmt::Debug + FromUrl + AppendToUrl + HasBackend + HasTimeout,
{ {
let url = req_uri_to_url(req.uri())?; let url = req_uri_to_url(req.uri())?;
let mut query = match QT::from_url(&url) { let query = match QT::from_url(&url) {
Ok(k) => k, Ok(k) => k,
Err(_) => { Err(_) => {
let msg = format!("malformed request or missing parameters {}", req.uri()); let msg = format!("malformed request or missing parameters {}", req.uri());
@@ -515,10 +515,7 @@ where
} }
}; };
trace!("proxy_backend_query {:?} {:?}", query, req.uri()); trace!("proxy_backend_query {:?} {:?}", query, req.uri());
let timeout = query.timeout(); let timeout = Duration::from_millis(1000 * 30);
let timeout_next = timeout.saturating_sub(Duration::from_millis(1000));
trace!("timeout {timeout:?} timeout_next {timeout_next:?}");
query.set_timeout(timeout_next);
let query = query; let query = query;
let backend = query.backend(); let backend = query.backend();
let uri_path = proxy_backend_query_helper_uri_path(req.uri().path(), &url); let uri_path = proxy_backend_query_helper_uri_path(req.uri().path(), &url);
@@ -598,23 +595,25 @@ pub async fn proxy_backend_query_inner(
Ok::<_, Error>(res) Ok::<_, Error>(res)
}; };
let res = tokio::time::timeout(timeout, fut).await.map_err(|_| { match tokio::time::timeout(timeout, fut).await {
let e = Error::with_msg_no_trace(format!("timeout trying to make sub request")); Ok(res) => {
warn!("{e}"); let res = res?;
e use bytes::Bytes;
})??; use httpclient::http_body::Frame;
use httpclient::BodyError;
{ let (head, body) = res.into_parts();
use bytes::Bytes; let body = StreamIncoming::new(body);
use httpclient::http_body::Frame; let body = body.map(|x| x.map(Frame::data));
use httpclient::BodyError; let body: Pin<Box<dyn Stream<Item = Result<Frame<Bytes>, BodyError>> + Send>> = Box::pin(body);
let (head, body) = res.into_parts(); let body = http_body_util::StreamBody::new(body);
let body = StreamIncoming::new(body); let ret = Response::from_parts(head, body);
let body = body.map(|x| x.map(Frame::data)); Ok(ret)
let body: Pin<Box<dyn Stream<Item = Result<Frame<Bytes>, BodyError>> + Send>> = Box::pin(body); }
let body = http_body_util::StreamBody::new(body); Err(_) => Ok(httpclient::error_status_response(
let ret = Response::from_parts(head, body); StatusCode::REQUEST_TIMEOUT,
Ok(ret) format!("request timed out at proxy, limit {} ms", timeout.as_millis() as u64),
ctx.reqid(),
)),
} }
} }
+2 -7
View File
@@ -843,13 +843,8 @@ impl HasBackend for MapPulseQuery {
} }
impl HasTimeout for MapPulseQuery { impl HasTimeout for MapPulseQuery {
fn timeout(&self) -> Duration { fn timeout(&self) -> Option<Duration> {
MAP_PULSE_QUERY_TIMEOUT Some(MAP_PULSE_QUERY_TIMEOUT)
}
fn set_timeout(&mut self, timeout: Duration) {
// TODO
// self.timeout = Some(timeout);
} }
} }
+11 -2
View File
@@ -11,8 +11,17 @@ pub struct IsoDateTime(DateTime<Utc>);
impl IsoDateTime { impl IsoDateTime {
pub fn from_unix_millis(ms: u64) -> Self { pub fn from_unix_millis(ms: u64) -> Self {
let datetime = chrono::DateTime::from_timestamp_millis(ms as i64).unwrap(); // let datetime = chrono::DateTime::from_timestamp_millis(ms as i64).unwrap();
Self(datetime) // Self(datetime)
IsoDateTime(
Utc.timestamp_millis_opt(ms as i64)
.earliest()
.unwrap_or(Utc.timestamp_nanos(0)),
)
}
pub fn from_ns_u64(ts: u64) -> Self {
IsoDateTime(Utc.timestamp_nanos(ts as i64))
} }
} }
+2 -2
View File
@@ -499,9 +499,9 @@ impl<NTY: ScalarOps> CollectorType for BinsDim0Collector<NTY> {
match vals.ts2s.back() { match vals.ts2s.back() {
Some(&k) => { Some(&k) => {
let missing_bins = bin_count_exp - bin_count; let missing_bins = bin_count_exp - bin_count;
let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64)); let continue_at = IsoDateTime::from_ns_u64(k);
let u = k + (k - vals.ts1s.back().unwrap()) * missing_bins as u64; let u = k + (k - vals.ts1s.back().unwrap()) * missing_bins as u64;
let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64)); let finished_at = IsoDateTime::from_ns_u64(u);
(missing_bins, Some(continue_at), Some(finished_at)) (missing_bins, Some(continue_at), Some(finished_at))
} }
None => { None => {
+2 -2
View File
@@ -473,9 +473,9 @@ impl<NTY: ScalarOps> CollectorType for BinsXbinDim0Collector<NTY> {
match self.vals.ts2s.back() { match self.vals.ts2s.back() {
Some(&k) => { Some(&k) => {
let missing_bins = bin_count_exp - bin_count; let missing_bins = bin_count_exp - bin_count;
let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64)); let continue_at = IsoDateTime::from_ns_u64(k);
let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64; let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64;
let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64)); let finished_at = IsoDateTime::from_ns_u64(u);
(missing_bins, Some(continue_at), Some(finished_at)) (missing_bins, Some(continue_at), Some(finished_at))
} }
None => { None => {
+4 -229
View File
@@ -83,231 +83,6 @@ macro_rules! trace_binning {
}; };
} }
#[derive(Debug)]
pub struct EventsDim0EnumCollector {
vals: EventsDim0Enum,
range_final: bool,
timed_out: bool,
needs_continue_at: bool,
}
impl EventsDim0EnumCollector {
pub fn new() -> Self {
Self {
vals: EventsDim0Enum::new(),
range_final: false,
timed_out: false,
needs_continue_at: false,
}
}
}
impl TypeName for EventsDim0EnumCollector {
fn type_name(&self) -> String {
"EventsDim0EnumCollector".into()
}
}
impl WithLen for EventsDim0EnumCollector {
fn len(&self) -> usize {
self.vals.tss.len()
}
}
impl ByteEstimate for EventsDim0EnumCollector {
fn byte_estimate(&self) -> u64 {
// TODO does it need to be more accurate?
30 * self.len() as u64
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct EventsDim0EnumCollectorOutput {
#[serde(rename = "tsAnchor")]
ts_anchor_sec: u64,
#[serde(rename = "tsMs")]
ts_off_ms: VecDeque<u64>,
#[serde(rename = "tsNs")]
ts_off_ns: VecDeque<u64>,
#[serde(rename = "values")]
vals: VecDeque<u16>,
#[serde(rename = "valuestrings")]
valstrs: VecDeque<String>,
#[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")]
range_final: bool,
#[serde(rename = "timedOut", default, skip_serializing_if = "is_false")]
timed_out: bool,
#[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
continue_at: Option<IsoDateTime>,
}
impl WithLen for EventsDim0EnumCollectorOutput {
fn len(&self) -> usize {
todo!()
}
}
impl AsAnyRef for EventsDim0EnumCollectorOutput {
fn as_any_ref(&self) -> &dyn Any {
todo!()
}
}
impl AsAnyMut for EventsDim0EnumCollectorOutput {
fn as_any_mut(&mut self) -> &mut dyn Any {
todo!()
}
}
impl ToJsonResult for EventsDim0EnumCollectorOutput {
fn to_json_result(&self) -> Result<Box<dyn ToJsonBytes>, Error> {
todo!()
}
}
impl Collected for EventsDim0EnumCollectorOutput {}
impl CollectorType for EventsDim0EnumCollector {
type Input = EventsDim0Enum;
type Output = EventsDim0EnumCollectorOutput;
fn ingest(&mut self, src: &mut EventsDim0Enum) {
self.vals.tss.append(&mut src.tss);
self.vals.values.append(&mut src.values);
self.vals.valuestrs.append(&mut src.valuestrs);
}
fn set_range_complete(&mut self) {
self.range_final = true;
}
fn set_timed_out(&mut self) {
self.timed_out = true;
self.needs_continue_at = true;
}
fn set_continue_at_here(&mut self) {
self.needs_continue_at = true;
}
fn result(
&mut self,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<EventsDim0EnumCollectorOutput, Error> {
debug!(
"{} result() needs_continue_at {}",
self.type_name(),
self.needs_continue_at
);
// If we timed out, we want to hint the client from where to continue.
// This is tricky: currently, client can not request a left-exclusive range.
// We currently give the timestamp of the last event plus a small delta.
// The amount of the delta must take into account what kind of timestamp precision the client
// can parse and handle.
let vals = &mut self.vals;
let continue_at = if self.needs_continue_at {
if let Some(ts) = vals.tss.back() {
let x = Some(IsoDateTime::from_u64(*ts / MS * MS + MS));
x
} else {
if let Some(range) = &range {
match range {
SeriesRange::TimeRange(x) => Some(IsoDateTime::from_u64(x.beg + SEC)),
SeriesRange::PulseRange(_) => {
error!("TODO emit create continueAt for pulse range");
Some(IsoDateTime::from_u64(0))
}
}
} else {
Some(IsoDateTime::from_u64(0))
}
}
} else {
None
};
let tss_sl = vals.tss.make_contiguous();
let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::ts_offs_from_abs(tss_sl);
let valixs = mem::replace(&mut vals.values, VecDeque::new());
let valstrs = mem::replace(&mut vals.valuestrs, VecDeque::new());
let vals = valixs;
if ts_off_ms.len() != ts_off_ns.len() {
return Err(Error::with_msg_no_trace("collected len mismatch"));
}
if ts_off_ms.len() != vals.len() {
return Err(Error::with_msg_no_trace("collected len mismatch"));
}
if ts_off_ms.len() != valstrs.len() {
return Err(Error::with_msg_no_trace("collected len mismatch"));
}
let ret = Self::Output {
ts_anchor_sec,
ts_off_ms,
ts_off_ns,
vals,
valstrs,
range_final: self.range_final,
timed_out: self.timed_out,
continue_at,
};
Ok(ret)
}
}
// Experiment with having this special case for enums
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct EventsDim0Enum {
pub tss: VecDeque<u64>,
pub values: VecDeque<u16>,
pub valuestrs: VecDeque<String>,
}
impl EventsDim0Enum {
pub fn new() -> Self {
Self {
tss: VecDeque::new(),
values: VecDeque::new(),
valuestrs: VecDeque::new(),
}
}
pub fn push_back(&mut self, ts: u64, value: u16, valuestr: String) {
self.tss.push_back(ts);
self.values.push_back(value);
self.valuestrs.push_back(valuestr);
}
}
impl TypeName for EventsDim0Enum {
fn type_name(&self) -> String {
"EventsDim0Enum".into()
}
}
impl AsAnyRef for EventsDim0Enum {
fn as_any_ref(&self) -> &dyn Any {
self
}
}
impl AsAnyMut for EventsDim0Enum {
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
impl WithLen for EventsDim0Enum {
fn len(&self) -> usize {
self.tss.len()
}
}
impl Collectable for EventsDim0Enum {
fn new_collector(&self) -> Box<dyn Collector> {
Box::new(EventsDim0EnumCollector::new())
}
}
#[derive(Clone, PartialEq, Serialize, Deserialize)] #[derive(Clone, PartialEq, Serialize, Deserialize)]
pub struct EventsDim0NoPulse<STY> { pub struct EventsDim0NoPulse<STY> {
pub tss: VecDeque<u64>, pub tss: VecDeque<u64>,
@@ -701,19 +476,19 @@ impl<STY: ScalarOps> CollectorType for EventsDim0Collector<STY> {
let vals = &mut self.vals; let vals = &mut self.vals;
let continue_at = if self.needs_continue_at { let continue_at = if self.needs_continue_at {
if let Some(ts) = vals.tss.back() { if let Some(ts) = vals.tss.back() {
let x = Some(IsoDateTime::from_u64(*ts / MS * MS + MS)); let x = Some(IsoDateTime::from_ns_u64(*ts / MS * MS + MS));
x x
} else { } else {
if let Some(range) = &range { if let Some(range) = &range {
match range { match range {
SeriesRange::TimeRange(x) => Some(IsoDateTime::from_u64(x.beg + SEC)), SeriesRange::TimeRange(x) => Some(IsoDateTime::from_ns_u64(x.beg + SEC)),
SeriesRange::PulseRange(_) => { SeriesRange::PulseRange(_) => {
error!("TODO emit create continueAt for pulse range"); error!("TODO emit create continueAt for pulse range");
Some(IsoDateTime::from_u64(0)) Some(IsoDateTime::from_ns_u64(0))
} }
} }
} else { } else {
Some(IsoDateTime::from_u64(0)) Some(IsoDateTime::from_ns_u64(0))
} }
} }
} else { } else {
+491
View File
@@ -0,0 +1,491 @@
use err::Error;
use items_0::collect_s::Collectable;
use items_0::collect_s::Collected;
use items_0::collect_s::Collector;
use items_0::collect_s::CollectorType;
use items_0::collect_s::ToJsonBytes;
use items_0::collect_s::ToJsonResult;
use items_0::container::ByteEstimate;
use items_0::isodate::IsoDateTime;
use items_0::overlap::RangeOverlapInfo;
use items_0::scalar_ops::ScalarOps;
use items_0::timebin::TimeBinnable;
use items_0::timebin::TimeBinnableTy;
use items_0::timebin::TimeBinnerTy;
use items_0::AsAnyMut;
use items_0::AsAnyRef;
use items_0::Events;
use items_0::EventsNonObj;
use items_0::TypeName;
use items_0::WithLen;
use netpod::log::*;
use netpod::range::evrange::SeriesRange;
use netpod::timeunits::MS;
use netpod::timeunits::SEC;
use netpod::BinnedRangeEnum;
use serde::Deserialize;
use serde::Serialize;
use std::any::Any;
use std::collections::VecDeque;
use std::mem;
#[allow(unused)]
macro_rules! trace_collect_result {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
#[derive(Debug)]
pub struct EventsDim0EnumCollector {
vals: EventsDim0Enum,
range_final: bool,
timed_out: bool,
needs_continue_at: bool,
}
impl EventsDim0EnumCollector {
pub fn new() -> Self {
Self {
vals: EventsDim0Enum::new(),
range_final: false,
timed_out: false,
needs_continue_at: false,
}
}
}
impl TypeName for EventsDim0EnumCollector {
fn type_name(&self) -> String {
"EventsDim0EnumCollector".into()
}
}
impl WithLen for EventsDim0EnumCollector {
fn len(&self) -> usize {
self.vals.tss.len()
}
}
impl ByteEstimate for EventsDim0EnumCollector {
fn byte_estimate(&self) -> u64 {
// TODO does it need to be more accurate?
30 * self.len() as u64
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct EventsDim0EnumCollectorOutput {
#[serde(rename = "tsAnchor")]
ts_anchor_sec: u64,
#[serde(rename = "tsMs")]
ts_off_ms: VecDeque<u64>,
#[serde(rename = "tsNs")]
ts_off_ns: VecDeque<u64>,
#[serde(rename = "values")]
vals: VecDeque<u16>,
#[serde(rename = "valuestrings")]
valstrs: VecDeque<String>,
#[serde(rename = "rangeFinal", default, skip_serializing_if = "netpod::is_false")]
range_final: bool,
#[serde(rename = "timedOut", default, skip_serializing_if = "netpod::is_false")]
timed_out: bool,
#[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
continue_at: Option<IsoDateTime>,
}
impl WithLen for EventsDim0EnumCollectorOutput {
fn len(&self) -> usize {
todo!()
}
}
impl AsAnyRef for EventsDim0EnumCollectorOutput {
fn as_any_ref(&self) -> &dyn Any {
todo!()
}
}
impl AsAnyMut for EventsDim0EnumCollectorOutput {
fn as_any_mut(&mut self) -> &mut dyn Any {
todo!()
}
}
impl ToJsonResult for EventsDim0EnumCollectorOutput {
fn to_json_result(&self) -> Result<Box<dyn ToJsonBytes>, Error> {
todo!()
}
}
impl Collected for EventsDim0EnumCollectorOutput {}
impl CollectorType for EventsDim0EnumCollector {
type Input = EventsDim0Enum;
type Output = EventsDim0EnumCollectorOutput;
fn ingest(&mut self, src: &mut EventsDim0Enum) {
self.vals.tss.append(&mut src.tss);
self.vals.values.append(&mut src.values);
self.vals.valuestrs.append(&mut src.valuestrs);
}
fn set_range_complete(&mut self) {
self.range_final = true;
}
fn set_timed_out(&mut self) {
self.timed_out = true;
self.needs_continue_at = true;
}
fn set_continue_at_here(&mut self) {
self.needs_continue_at = true;
}
fn result(
&mut self,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<EventsDim0EnumCollectorOutput, Error> {
trace_collect_result!(
"{} result() needs_continue_at {}",
self.type_name(),
self.needs_continue_at
);
// If we timed out, we want to hint the client from where to continue.
// This is tricky: currently, client can not request a left-exclusive range.
// We currently give the timestamp of the last event plus a small delta.
// The amount of the delta must take into account what kind of timestamp precision the client
// can parse and handle.
let vals = &mut self.vals;
let continue_at = if self.needs_continue_at {
if let Some(ts) = vals.tss.back() {
let x = Some(IsoDateTime::from_ns_u64(*ts / MS * MS + MS));
x
} else {
if let Some(range) = &range {
match range {
SeriesRange::TimeRange(x) => Some(IsoDateTime::from_ns_u64(x.beg + SEC)),
SeriesRange::PulseRange(_) => {
error!("TODO emit create continueAt for pulse range");
Some(IsoDateTime::from_ns_u64(0))
}
}
} else {
Some(IsoDateTime::from_ns_u64(0))
}
}
} else {
None
};
let tss_sl = vals.tss.make_contiguous();
let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::ts_offs_from_abs(tss_sl);
let valixs = mem::replace(&mut vals.values, VecDeque::new());
let valstrs = mem::replace(&mut vals.valuestrs, VecDeque::new());
let vals = valixs;
if ts_off_ms.len() != ts_off_ns.len() {
return Err(Error::with_msg_no_trace("collected len mismatch"));
}
if ts_off_ms.len() != vals.len() {
return Err(Error::with_msg_no_trace("collected len mismatch"));
}
if ts_off_ms.len() != valstrs.len() {
return Err(Error::with_msg_no_trace("collected len mismatch"));
}
let ret = Self::Output {
ts_anchor_sec,
ts_off_ms,
ts_off_ns,
vals,
valstrs,
range_final: self.range_final,
timed_out: self.timed_out,
continue_at,
};
Ok(ret)
}
}
// Experiment with having this special case for enums
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct EventsDim0Enum {
pub tss: VecDeque<u64>,
pub values: VecDeque<u16>,
pub valuestrs: VecDeque<String>,
}
impl EventsDim0Enum {
pub fn new() -> Self {
Self {
tss: VecDeque::new(),
values: VecDeque::new(),
valuestrs: VecDeque::new(),
}
}
pub fn push_back(&mut self, ts: u64, value: u16, valuestr: String) {
self.tss.push_back(ts);
self.values.push_back(value);
self.valuestrs.push_back(valuestr);
}
}
impl TypeName for EventsDim0Enum {
fn type_name(&self) -> String {
"EventsDim0Enum".into()
}
}
impl AsAnyRef for EventsDim0Enum {
fn as_any_ref(&self) -> &dyn Any {
self
}
}
impl AsAnyMut for EventsDim0Enum {
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
impl WithLen for EventsDim0Enum {
fn len(&self) -> usize {
self.tss.len()
}
}
impl Collectable for EventsDim0Enum {
fn new_collector(&self) -> Box<dyn Collector> {
Box::new(EventsDim0EnumCollector::new())
}
}
// impl Events
impl ByteEstimate for EventsDim0Enum {
fn byte_estimate(&self) -> u64 {
todo!()
}
}
impl EventsNonObj for EventsDim0Enum {
fn into_tss_pulses(self: Box<Self>) -> (VecDeque<u64>, VecDeque<u64>) {
todo!()
}
}
impl RangeOverlapInfo for EventsDim0Enum {
fn ends_before(&self, range: &SeriesRange) -> bool {
todo!()
}
fn ends_after(&self, range: &SeriesRange) -> bool {
todo!()
}
fn starts_after(&self, range: &SeriesRange) -> bool {
todo!()
}
}
// NOTE just a dummy because currently we don't use this for time binning
#[derive(Debug)]
pub struct EventsDim0EnumTimeBinner;
impl TimeBinnerTy for EventsDim0EnumTimeBinner {
type Input = EventsDim0Enum;
type Output = ();
fn ingest(&mut self, item: &mut Self::Input) {
todo!()
}
fn set_range_complete(&mut self) {
todo!()
}
fn bins_ready_count(&self) -> usize {
todo!()
}
fn bins_ready(&mut self) -> Option<Self::Output> {
todo!()
}
fn push_in_progress(&mut self, push_empty: bool) {
todo!()
}
fn cycle(&mut self) {
todo!()
}
fn empty(&self) -> Option<Self::Output> {
todo!()
}
fn append_empty_until_end(&mut self) {
todo!()
}
}
// NOTE just a dummy because currently we don't use this for time binning
impl TimeBinnableTy for EventsDim0Enum {
type TimeBinner = EventsDim0EnumTimeBinner;
fn time_binner_new(
&self,
binrange: BinnedRangeEnum,
do_time_weight: bool,
emit_empty_bins: bool,
) -> Self::TimeBinner {
todo!()
}
}
// NOTE just a dummy because currently we don't use this for time binning
impl TimeBinnable for EventsDim0Enum {
fn time_binner_new(
&self,
binrange: BinnedRangeEnum,
do_time_weight: bool,
emit_empty_bins: bool,
) -> Box<dyn items_0::timebin::TimeBinner> {
todo!()
}
fn to_box_to_json_result(&self) -> Box<dyn ToJsonResult> {
todo!()
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct EventsDim0EnumChunkOutput {
tss: VecDeque<u64>,
values: VecDeque<u16>,
valuestrings: VecDeque<String>,
scalar_type: String,
}
impl Events for EventsDim0Enum {
fn as_time_binnable_ref(&self) -> &dyn items_0::timebin::TimeBinnable {
todo!()
}
fn as_time_binnable_mut(&mut self) -> &mut dyn items_0::timebin::TimeBinnable {
todo!()
}
fn verify(&self) -> bool {
todo!()
}
fn output_info(&self) -> String {
todo!()
}
fn as_collectable_mut(&mut self) -> &mut dyn Collectable {
todo!()
}
fn as_collectable_with_default_ref(&self) -> &dyn Collectable {
todo!()
}
fn as_collectable_with_default_mut(&mut self) -> &mut dyn Collectable {
todo!()
}
fn ts_min(&self) -> Option<u64> {
todo!()
}
fn ts_max(&self) -> Option<u64> {
todo!()
}
fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box<dyn Events> {
todo!()
}
fn new_empty_evs(&self) -> Box<dyn Events> {
todo!()
}
fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), items_0::MergeError> {
todo!()
}
fn find_lowest_index_gt_evs(&self, ts: u64) -> Option<usize> {
todo!()
}
fn find_lowest_index_ge_evs(&self, ts: u64) -> Option<usize> {
todo!()
}
fn find_highest_index_lt_evs(&self, ts: u64) -> Option<usize> {
todo!()
}
fn clone_dyn(&self) -> Box<dyn Events> {
todo!()
}
fn partial_eq_dyn(&self, other: &dyn Events) -> bool {
todo!()
}
fn serde_id(&self) -> &'static str {
todo!()
}
fn nty_id(&self) -> u32 {
todo!()
}
fn tss(&self) -> &VecDeque<u64> {
todo!()
}
fn pulses(&self) -> &VecDeque<u64> {
todo!()
}
fn frame_type_id(&self) -> u32 {
todo!()
}
fn to_min_max_avg(&mut self) -> Box<dyn Events> {
todo!()
}
fn to_json_string(&self) -> String {
todo!()
}
fn to_json_vec_u8(&self) -> Vec<u8> {
self.to_json_string().into_bytes()
}
fn to_cbor_vec_u8(&self) -> Vec<u8> {
// TODO redesign with mut access, rename to `into_` and take the values out.
let ret = EventsDim0EnumChunkOutput {
// TODO use &mut to swap the content
tss: self.tss.clone(),
values: self.values.clone(),
valuestrings: self.valuestrs.clone(),
scalar_type: netpod::EnumVariant::scalar_type_name().into(),
};
let mut buf = Vec::new();
ciborium::into_writer(&ret, &mut buf).unwrap();
buf
}
fn clear(&mut self) {
todo!()
}
}
+4 -4
View File
@@ -400,19 +400,19 @@ impl<STY: ScalarOps> CollectorType for EventsDim1Collector<STY> {
let vals = &mut self.vals; let vals = &mut self.vals;
let continue_at = if self.timed_out { let continue_at = if self.timed_out {
if let Some(ts) = vals.tss.back() { if let Some(ts) = vals.tss.back() {
Some(IsoDateTime::from_u64(*ts + MS)) Some(IsoDateTime::from_ns_u64(*ts + MS))
} else { } else {
if let Some(range) = &range { if let Some(range) = &range {
match range { match range {
SeriesRange::TimeRange(x) => Some(IsoDateTime::from_u64(x.beg + SEC)), SeriesRange::TimeRange(x) => Some(IsoDateTime::from_ns_u64(x.beg + SEC)),
SeriesRange::PulseRange(x) => { SeriesRange::PulseRange(x) => {
error!("TODO emit create continueAt for pulse range"); error!("TODO emit create continueAt for pulse range");
Some(IsoDateTime::from_u64(0)) Some(IsoDateTime::from_ns_u64(0))
} }
} }
} else { } else {
warn!("can not determine continue-at parameters"); warn!("can not determine continue-at parameters");
Some(IsoDateTime::from_u64(0)) Some(IsoDateTime::from_ns_u64(0))
} }
} }
} else { } else {
+3 -28
View File
@@ -6,6 +6,7 @@ pub mod channelevents;
pub mod empty; pub mod empty;
pub mod eventfull; pub mod eventfull;
pub mod eventsdim0; pub mod eventsdim0;
pub mod eventsdim0enum;
pub mod eventsdim1; pub mod eventsdim1;
pub mod eventsxbindim0; pub mod eventsxbindim0;
pub mod framable; pub mod framable;
@@ -20,10 +21,8 @@ pub mod timebin;
pub mod transform; pub mod transform;
use channelevents::ChannelEvents; use channelevents::ChannelEvents;
use chrono::DateTime;
use chrono::TimeZone;
use chrono::Utc;
use futures_util::Stream; use futures_util::Stream;
use items_0::isodate::IsoDateTime;
use items_0::overlap::RangeOverlapInfo; use items_0::overlap::RangeOverlapInfo;
use items_0::streamitem::Sitemty; use items_0::streamitem::Sitemty;
use items_0::transform::EventTransform; use items_0::transform::EventTransform;
@@ -33,10 +32,6 @@ use items_0::MergeError;
use merger::Mergeable; use merger::Mergeable;
use netpod::range::evrange::SeriesRange; use netpod::range::evrange::SeriesRange;
use netpod::timeunits::*; use netpod::timeunits::*;
use netpod::DATETIME_FMT_3MS;
use serde::Deserialize;
use serde::Serialize;
use serde::Serializer;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::fmt; use std::fmt;
@@ -133,28 +128,8 @@ impl serde::de::Error for Error {
} }
} }
#[derive(Clone, Debug, PartialEq, Deserialize)]
pub struct IsoDateTime(DateTime<Utc>);
impl IsoDateTime {
pub fn from_u64(ts: u64) -> Self {
IsoDateTime(Utc.timestamp_nanos(ts as i64))
}
}
impl Serialize for IsoDateTime {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&self.0.format(DATETIME_FMT_3MS).to_string())
}
}
pub fn make_iso_ts(tss: &[u64]) -> Vec<IsoDateTime> { pub fn make_iso_ts(tss: &[u64]) -> Vec<IsoDateTime> {
tss.iter() tss.iter().map(|&k| IsoDateTime::from_ns_u64(k)).collect()
.map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64)))
.collect()
} }
impl Mergeable for Box<dyn Events> { impl Mergeable for Box<dyn Events> {
+1 -4
View File
@@ -503,10 +503,7 @@ fn binned_timeout_00() {
assert_eq!(r2.mins(), &[3.0, 2.0, 3.0]); assert_eq!(r2.mins(), &[3.0, 2.0, 3.0]);
assert_eq!(r2.maxs(), &[3.2, 2.2, 3.2]); assert_eq!(r2.maxs(), &[3.2, 2.2, 3.2]);
assert_eq!(r2.missing_bins(), 6); assert_eq!(r2.missing_bins(), 6);
assert_eq!( assert_eq!(r2.continue_at(), Some(IsoDateTime::from_ns_u64(TSBASE + SEC * 4)));
r2.continue_at(),
Some(IsoDateTime(Utc.timestamp_nanos((TSBASE + SEC * 4) as i64)))
);
Ok::<_, Error>(()) Ok::<_, Error>(())
}; };
runfut(fut).unwrap(); runfut(fut).unwrap();
+6 -16
View File
@@ -3213,9 +3213,9 @@ pub trait HasBackend {
fn backend(&self) -> &str; fn backend(&self) -> &str;
} }
// TODO change into Option, why do I need to set a timeout using this trait?
pub trait HasTimeout { pub trait HasTimeout {
fn timeout(&self) -> Duration; fn timeout(&self) -> Option<Duration>;
fn set_timeout(&mut self, timeout: Duration);
} }
pub trait FromUrl: Sized { pub trait FromUrl: Sized {
@@ -3257,18 +3257,13 @@ impl HasBackend for MapQuery {
} }
impl HasTimeout for MapQuery { impl HasTimeout for MapQuery {
fn timeout(&self) -> Duration { fn timeout(&self) -> Option<Duration> {
let x: Option<u32> = if let Some(v) = self.get("timeout") { let x: Option<u32> = if let Some(v) = self.get("timeout") {
v.parse::<u32>().ok() v.parse::<u32>().ok()
} else { } else {
None None
}; };
let x = x.unwrap_or(5000); x.map(|x| Duration::from_millis(x as _))
Duration::from_millis(x as _)
}
fn set_timeout(&mut self, timeout: Duration) {
self.insert("timeout".into(), format!("{:.0}", 1e3 * timeout.as_secs_f32()));
} }
} }
@@ -3294,13 +3289,8 @@ impl HasBackend for ChannelConfigQuery {
} }
impl HasTimeout for ChannelConfigQuery { impl HasTimeout for ChannelConfigQuery {
fn timeout(&self) -> Duration { fn timeout(&self) -> Option<Duration> {
Duration::from_millis(10000) None
}
fn set_timeout(&mut self, _timeout: Duration) {
// TODO
// self.timeout = Some(timeout);
} }
} }
+2 -7
View File
@@ -297,13 +297,8 @@ impl HasBackend for ChannelStateEventsQuery {
} }
impl HasTimeout for ChannelStateEventsQuery { impl HasTimeout for ChannelStateEventsQuery {
fn timeout(&self) -> Duration { fn timeout(&self) -> Option<Duration> {
Duration::from_millis(10000) None
}
fn set_timeout(&mut self, timeout: Duration) {
// TODO
// self.timeout = Some(timeout);
} }
} }
+4 -14
View File
@@ -41,13 +41,8 @@ impl HasBackend for AccountingIngestedBytesQuery {
} }
impl HasTimeout for AccountingIngestedBytesQuery { impl HasTimeout for AccountingIngestedBytesQuery {
fn timeout(&self) -> Duration { fn timeout(&self) -> Option<Duration> {
Duration::from_millis(10000) None
}
fn set_timeout(&mut self, timeout: Duration) {
// TODO
// self.timeout = Some(timeout);
} }
} }
@@ -115,13 +110,8 @@ impl HasBackend for AccountingToplistQuery {
} }
impl HasTimeout for AccountingToplistQuery { impl HasTimeout for AccountingToplistQuery {
fn timeout(&self) -> Duration { fn timeout(&self) -> Option<Duration> {
Duration::from_millis(10000) None
}
fn set_timeout(&mut self, timeout: Duration) {
// TODO
// self.timeout = Some(timeout);
} }
} }
+38 -27
View File
@@ -22,6 +22,8 @@ pub struct BinnedQuery {
channel: SfDbChannel, channel: SfDbChannel,
range: SeriesRange, range: SeriesRange,
bin_count: u32, bin_count: u32,
#[serde(default, skip_serializing_if = "Option::is_none", with = "humantime_serde")]
bin_width: Option<Duration>,
#[serde( #[serde(
default = "TransformQuery::default_time_binned", default = "TransformQuery::default_time_binned",
skip_serializing_if = "TransformQuery::is_default_time_binned" skip_serializing_if = "TransformQuery::is_default_time_binned"
@@ -31,8 +33,13 @@ pub struct BinnedQuery {
cache_usage: Option<CacheUsage>, cache_usage: Option<CacheUsage>,
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
bins_max: Option<u32>, bins_max: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(
timeout: Option<Duration>, default,
skip_serializing_if = "Option::is_none",
with = "humantime_serde",
rename = "contentTimeout"
)]
timeout_content: Option<Duration>,
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
buf_len_disk_io: Option<usize>, buf_len_disk_io: Option<usize>,
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
@@ -53,12 +60,13 @@ impl BinnedQuery {
channel, channel,
range, range,
bin_count, bin_count,
bin_width: None,
transform: TransformQuery::default_time_binned(), transform: TransformQuery::default_time_binned(),
cache_usage: None, cache_usage: None,
bins_max: None, bins_max: None,
buf_len_disk_io: None, buf_len_disk_io: None,
disk_stats_every: None, disk_stats_every: None,
timeout: None, timeout_content: None,
merger_out_len_max: None, merger_out_len_max: None,
test_do_wasm: None, test_do_wasm: None,
log_level: String::new(), log_level: String::new(),
@@ -100,19 +108,12 @@ impl BinnedQuery {
} }
} }
pub fn timeout(&self) -> Option<Duration> { pub fn timeout_content(&self) -> Option<Duration> {
self.timeout.clone() self.timeout_content
}
pub fn timeout_value(&self) -> Duration {
match &self.timeout {
Some(x) => x.clone(),
None => Duration::from_millis(6000),
}
} }
pub fn bins_max(&self) -> u32 { pub fn bins_max(&self) -> u32 {
self.bins_max.unwrap_or(2000) self.bins_max.unwrap_or(200000)
} }
pub fn merger_out_len_max(&self) -> usize { pub fn merger_out_len_max(&self) -> usize {
@@ -135,8 +136,9 @@ impl BinnedQuery {
self.disk_stats_every = Some(k); self.disk_stats_every = Some(k);
} }
// Currently only for testing
pub fn set_timeout(&mut self, k: Duration) { pub fn set_timeout(&mut self, k: Duration) {
self.timeout = Some(k); self.timeout_content = Some(k);
} }
pub fn set_buf_len_disk_io(&mut self, k: usize) { pub fn set_buf_len_disk_io(&mut self, k: usize) {
@@ -172,12 +174,8 @@ impl HasBackend for BinnedQuery {
} }
impl HasTimeout for BinnedQuery { impl HasTimeout for BinnedQuery {
fn timeout(&self) -> Duration { fn timeout(&self) -> Option<Duration> {
self.timeout_value() self.timeout_content
}
fn set_timeout(&mut self, timeout: Duration) {
self.timeout = Some(timeout);
} }
} }
@@ -191,7 +189,8 @@ impl FromUrl for BinnedQuery {
let ret = Self { let ret = Self {
channel: SfDbChannel::from_pairs(&pairs)?, channel: SfDbChannel::from_pairs(&pairs)?,
range: SeriesRange::from_pairs(pairs)?, range: SeriesRange::from_pairs(pairs)?,
bin_count: pairs.get("binCount").map_or(None, |x| x.parse().ok()).unwrap_or(10), bin_count: pairs.get("binCount").and_then(|x| x.parse().ok()).unwrap_or(10),
bin_width: pairs.get("binWidth").and_then(|x| humantime::parse_duration(x).ok()),
transform: TransformQuery::from_pairs(pairs)?, transform: TransformQuery::from_pairs(pairs)?,
cache_usage: CacheUsage::from_pairs(&pairs)?, cache_usage: CacheUsage::from_pairs(&pairs)?,
buf_len_disk_io: pairs buf_len_disk_io: pairs
@@ -207,10 +206,9 @@ impl FromUrl for BinnedQuery {
.map_or("false", |k| k) .map_or("false", |k| k)
.parse() .parse()
.map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?,*/ .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?,*/
timeout: pairs timeout_content: pairs
.get("timeout") .get("contentTimeout")
.map(|x| x.parse::<u64>().map(Duration::from_millis).ok()) .and_then(|x| humantime::parse_duration(x).ok()),
.unwrap_or(None),
bins_max: pairs.get("binsMax").map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, bins_max: pairs.get("binsMax").map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?,
merger_out_len_max: pairs merger_out_len_max: pairs
.get("mergerOutLenMax") .get("mergerOutLenMax")
@@ -235,14 +233,27 @@ impl AppendToUrl for BinnedQuery {
{ {
let mut g = url.query_pairs_mut(); let mut g = url.query_pairs_mut();
g.append_pair("binCount", &format!("{}", self.bin_count)); g.append_pair("binCount", &format!("{}", self.bin_count));
if let Some(x) = self.bin_width {
if x < Duration::from_secs(1) {
g.append_pair("binWidth", &format!("{:.0}ms", x.subsec_millis()));
} else if x < Duration::from_secs(60) {
g.append_pair("binWidth", &format!("{:.0}s", x.as_secs_f64()));
} else if x < Duration::from_secs(60 * 60) {
g.append_pair("binWidth", &format!("{:.0}m", x.as_secs() / 60));
} else if x < Duration::from_secs(60 * 60 * 24) {
g.append_pair("binWidth", &format!("{:.0}h", x.as_secs() / 60 / 60));
} else {
g.append_pair("binWidth", &format!("{:.0}d", x.as_secs() / 60 / 60 / 24));
}
}
} }
self.transform.append_to_url(url); self.transform.append_to_url(url);
let mut g = url.query_pairs_mut(); let mut g = url.query_pairs_mut();
if let Some(x) = &self.cache_usage { if let Some(x) = &self.cache_usage {
g.append_pair("cacheUsage", &x.query_param_value()); g.append_pair("cacheUsage", &x.query_param_value());
} }
if let Some(x) = &self.timeout { if let Some(x) = &self.timeout_content {
g.append_pair("timeout", &format!("{}", x.as_millis())); g.append_pair("contentTimeout", &format!("{:.0}ms", 1e3 * x.as_secs_f64()));
} }
if let Some(x) = self.bins_max { if let Some(x) = self.bins_max {
g.append_pair("binsMax", &format!("{}", x)); g.append_pair("binsMax", &format!("{}", x));
+18 -18
View File
@@ -31,8 +31,13 @@ pub struct PlainEventsQuery {
#[serde(default = "TransformQuery::default_events")] #[serde(default = "TransformQuery::default_events")]
#[serde(skip_serializing_if = "TransformQuery::is_default_events")] #[serde(skip_serializing_if = "TransformQuery::is_default_events")]
transform: TransformQuery, transform: TransformQuery,
#[serde(default, skip_serializing_if = "Option::is_none", with = "humantime_serde")] #[serde(
timeout: Option<Duration>, default,
skip_serializing_if = "Option::is_none",
with = "humantime_serde",
rename = "contentTimeout"
)]
timeout_content: Option<Duration>,
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
events_max: Option<u64>, events_max: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
@@ -72,7 +77,7 @@ impl PlainEventsQuery {
range: range.into(), range: range.into(),
one_before_range: false, one_before_range: false,
transform: TransformQuery::default_events(), transform: TransformQuery::default_events(),
timeout: Some(Duration::from_millis(4000)), timeout_content: None,
events_max: None, events_max: None,
bytes_max: None, bytes_max: None,
allow_large_result: None, allow_large_result: None,
@@ -110,8 +115,8 @@ impl PlainEventsQuery {
self.buf_len_disk_io.unwrap_or(1024 * 8) self.buf_len_disk_io.unwrap_or(1024 * 8)
} }
pub fn timeout(&self) -> Duration { pub fn timeout_content(&self) -> Option<Duration> {
self.timeout.unwrap_or(Duration::from_millis(10000)) self.timeout_content
} }
pub fn events_max(&self) -> u64 { pub fn events_max(&self) -> u64 {
@@ -225,12 +230,8 @@ impl HasBackend for PlainEventsQuery {
} }
impl HasTimeout for PlainEventsQuery { impl HasTimeout for PlainEventsQuery {
fn timeout(&self) -> Duration { fn timeout(&self) -> Option<Duration> {
self.timeout() PlainEventsQuery::timeout_content(self)
}
fn set_timeout(&mut self, timeout: Duration) {
self.timeout = Some(timeout);
} }
} }
@@ -253,10 +254,9 @@ impl FromUrl for PlainEventsQuery {
range, range,
one_before_range: pairs.get("oneBeforeRange").map_or("false", |x| x.as_ref()) == "true", one_before_range: pairs.get("oneBeforeRange").map_or("false", |x| x.as_ref()) == "true",
transform: TransformQuery::from_pairs(pairs)?, transform: TransformQuery::from_pairs(pairs)?,
timeout: pairs timeout_content: pairs
.get("timeout") .get("contentTimeout")
.map(|x| x.parse::<u64>().map(Duration::from_millis).ok()) .and_then(|x| humantime::parse_duration(x).ok()),
.unwrap_or(None),
events_max: pairs.get("eventsMax").map_or(None, |k| k.parse().ok()), events_max: pairs.get("eventsMax").map_or(None, |k| k.parse().ok()),
bytes_max: pairs.get("bytesMax").map_or(None, |k| k.parse().ok()), bytes_max: pairs.get("bytesMax").map_or(None, |k| k.parse().ok()),
allow_large_result: pairs.get("allowLargeResult").map_or(None, |x| x.parse().ok()), allow_large_result: pairs.get("allowLargeResult").map_or(None, |x| x.parse().ok()),
@@ -317,8 +317,8 @@ impl AppendToUrl for PlainEventsQuery {
drop(g); drop(g);
self.transform.append_to_url(url); self.transform.append_to_url(url);
let mut g = url.query_pairs_mut(); let mut g = url.query_pairs_mut();
if let Some(x) = &self.timeout { if let Some(x) = &self.timeout_content {
g.append_pair("timeout", &format!("{:.0}", x.as_secs_f64() * 1e3)); g.append_pair("contentTimeout", &format!("{:.0}ms", 1e3 * x.as_secs_f64()));
} }
if let Some(x) = self.events_max.as_ref() { if let Some(x) = self.events_max.as_ref() {
g.append_pair("eventsMax", &x.to_string()); g.append_pair("eventsMax", &x.to_string());
@@ -431,7 +431,7 @@ impl Default for EventsSubQuerySettings {
impl From<&PlainEventsQuery> for EventsSubQuerySettings { impl From<&PlainEventsQuery> for EventsSubQuerySettings {
fn from(value: &PlainEventsQuery) -> Self { fn from(value: &PlainEventsQuery) -> Self {
Self { Self {
timeout: value.timeout, timeout: value.timeout_content(),
events_max: value.events_max, events_max: value.events_max,
bytes_max: value.bytes_max, bytes_max: value.bytes_max,
event_delay: value.event_delay, event_delay: value.event_delay,
+29
View File
@@ -103,6 +103,35 @@ fn map_events(x: Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, Error
// Ok(StreamItem::Log(item)) // Ok(StreamItem::Log(item))
}; };
} }
let mut k = evs;
let evs = if let Some(j) = k.as_any_mut().downcast_mut::<items_2::channelevents::ChannelEvents>() {
use items_0::AsAnyMut;
match j {
items_2::channelevents::ChannelEvents::Events(m) => {
if let Some(g) = m
.as_any_mut()
.downcast_mut::<items_2::eventsdim0::EventsDim0<netpod::EnumVariant>>()
{
trace!("consider container EnumVariant");
let mut out = items_2::eventsdim0enum::EventsDim0Enum::new();
for (&ts, val) in g.tss.iter().zip(g.values.iter()) {
out.push_back(ts, val.ix(), val.name_string());
}
Box::new(items_2::channelevents::ChannelEvents::Events(Box::new(out)))
} else {
trace!("consider container channel events other events {}", k.type_name());
k
}
}
items_2::channelevents::ChannelEvents::Status(_) => {
trace!("consider container channel events status {}", k.type_name());
k
}
}
} else {
trace!("consider container else {}", k.type_name());
k
};
let buf = evs.to_cbor_vec_u8(); let buf = evs.to_cbor_vec_u8();
let bytes = Bytes::from(buf); let bytes = Bytes::from(buf);
let item = CborBytes(bytes); let item = CborBytes(bytes);
+29
View File
@@ -58,6 +58,35 @@ fn map_events(x: Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, Error
Ok(x) => match x { Ok(x) => match x {
StreamItem::DataItem(x) => match x { StreamItem::DataItem(x) => match x {
RangeCompletableItem::Data(evs) => { RangeCompletableItem::Data(evs) => {
let mut k = evs;
let evs = if let Some(j) = k.as_any_mut().downcast_mut::<items_2::channelevents::ChannelEvents>() {
use items_0::AsAnyMut;
match j {
items_2::channelevents::ChannelEvents::Events(m) => {
if let Some(g) = m
.as_any_mut()
.downcast_mut::<items_2::eventsdim0::EventsDim0<netpod::EnumVariant>>()
{
trace!("consider container EnumVariant");
let mut out = items_2::eventsdim0enum::EventsDim0Enum::new();
for (&ts, val) in g.tss.iter().zip(g.values.iter()) {
out.push_back(ts, val.ix(), val.name_string());
}
Box::new(items_2::channelevents::ChannelEvents::Events(Box::new(out)))
} else {
trace!("consider container channel events other events {}", k.type_name());
k
}
}
items_2::channelevents::ChannelEvents::Status(_) => {
trace!("consider container channel events status {}", k.type_name());
k
}
}
} else {
trace!("consider container else {}", k.type_name());
k
};
let s = evs.to_json_string(); let s = evs.to_json_string();
let item = JsonBytes::new(s); let item = JsonBytes::new(s);
Ok(item) Ok(item)
+4 -2
View File
@@ -13,9 +13,11 @@ use items_0::on_sitemty_data;
use netpod::log::*; use netpod::log::*;
use netpod::ChannelTypeConfigGen; use netpod::ChannelTypeConfigGen;
use netpod::Cluster; use netpod::Cluster;
use netpod::HasTimeout;
use netpod::ReqCtx; use netpod::ReqCtx;
use query::api4::events::PlainEventsQuery; use query::api4::events::PlainEventsQuery;
use serde_json::Value as JsonValue; use serde_json::Value as JsonValue;
use std::time::Duration;
use std::time::Instant; use std::time::Instant;
#[derive(Debug, ThisError)] #[derive(Debug, ThisError)]
@@ -34,7 +36,7 @@ pub async fn plain_events_json(
open_bytes: OpenBoxedBytesStreamsBox, open_bytes: OpenBoxedBytesStreamsBox,
) -> Result<JsonValue, Error> { ) -> Result<JsonValue, Error> {
debug!("plain_events_json evquery {:?}", evq); debug!("plain_events_json evquery {:?}", evq);
let deadline = Instant::now() + evq.timeout(); let deadline = Instant::now() + evq.timeout().unwrap_or(Duration::from_millis(4000));
let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?; let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?;
@@ -49,7 +51,7 @@ pub async fn plain_events_json(
.downcast_mut::<items_2::eventsdim0::EventsDim0<netpod::EnumVariant>>() .downcast_mut::<items_2::eventsdim0::EventsDim0<netpod::EnumVariant>>()
{ {
trace!("consider container EnumVariant"); trace!("consider container EnumVariant");
let mut out = items_2::eventsdim0::EventsDim0Enum::new(); let mut out = items_2::eventsdim0enum::EventsDim0Enum::new();
for (&ts, val) in g.tss.iter().zip(g.values.iter()) { for (&ts, val) in g.tss.iter().zip(g.values.iter()) {
out.push_back(ts, val.ix(), val.name_string()); out.push_back(ts, val.ix(), val.name_string());
} }
+3 -2
View File
@@ -28,6 +28,7 @@ use netpod::ReqCtx;
use query::api4::binned::BinnedQuery; use query::api4::binned::BinnedQuery;
use serde_json::Value as JsonValue; use serde_json::Value as JsonValue;
use std::pin::Pin; use std::pin::Pin;
use std::time::Duration;
use std::time::Instant; use std::time::Instant;
#[allow(unused)] #[allow(unused)]
@@ -78,7 +79,7 @@ async fn timebinnable_stream(
}) })
}); });
#[cfg(DISABLED)] #[cfg(target_abi = "")]
#[cfg(wasm_transform)] #[cfg(wasm_transform)]
let stream = if let Some(wasmname) = wasm1 { let stream = if let Some(wasmname) = wasm1 {
debug!("make wasm transform"); debug!("make wasm transform");
@@ -257,7 +258,7 @@ pub async fn timebinned_json(
ctx: &ReqCtx, ctx: &ReqCtx,
open_bytes: OpenBoxedBytesStreamsBox, open_bytes: OpenBoxedBytesStreamsBox,
) -> Result<JsonValue, Error> { ) -> Result<JsonValue, Error> {
let deadline = Instant::now().checked_add(query.timeout_value()).unwrap(); let deadline = Instant::now() + query.timeout_content().unwrap_or(Duration::from_millis(5000));
let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?; let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?;
// TODO derive better values, from query // TODO derive better values, from query
let collect_max = 10000; let collect_max = 10000;