Limit response size in a somewhat improved way

This commit is contained in:
Dominik Werder
2024-02-22 17:12:50 +01:00
parent 6171f901ad
commit 6495c5a773
16 changed files with 642 additions and 405 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -9,3 +9,11 @@ curl "https://data-api.psi.ch/api/4/events?backend=sf-databuffer&channelName=S10
Note: if the channel changes data type within the requested date range, then the
server will return values for that data type which covers the requested
date range best.
Parameters:
- `backend`: the backend that the channel exists in, e.g. `sf-databuffer`.
- `channelName`: the name of the channel.
- `begDate`: start of the time range, inclusive. In ISO format e.g. `2024-02-15T12:41:00Z`.
- `endDate`: end of the time range, exclusive.
- `allowLargeResult=true` indicates that the client is prepared to accept also larger responses compared to
what might be suitable for a typical browser.

View File

@@ -1,6 +1,6 @@
[package]
name = "daqbuffer"
version = "0.5.0-alpha.1"
version = "0.5.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -1,3 +1,4 @@
use crate::container::ByteEstimate;
use crate::timebin::TimeBinned;
use crate::AsAnyMut;
use crate::AsAnyRef;
@@ -68,7 +69,7 @@ impl WithLen for Box<dyn Collected> {
impl Collected for Box<dyn Collected> {}
// TODO rename to `Typed`
pub trait CollectorType: fmt::Debug + Send + Unpin + WithLen {
pub trait CollectorType: fmt::Debug + Send + Unpin + WithLen + ByteEstimate {
type Input: Collectable;
type Output: Collected + ToJsonResult + Serialize;
@@ -81,7 +82,7 @@ pub trait CollectorType: fmt::Debug + Send + Unpin + WithLen {
fn result(&mut self, range: Option<SeriesRange>, binrange: Option<BinnedRangeEnum>) -> Result<Self::Output, Error>;
}
pub trait Collector: fmt::Debug + Send + Unpin + WithLen {
pub trait Collector: fmt::Debug + Send + Unpin + WithLen + ByteEstimate {
fn ingest(&mut self, src: &mut dyn Collectable);
fn set_range_complete(&mut self);
fn set_timed_out(&mut self);

View File

@@ -1,3 +1,4 @@
use crate::container::ByteEstimate;
use crate::subfr::SubFrId;
use serde::Serialize;
use std::fmt;
@@ -61,7 +62,7 @@ impl AsPrimF32 for String {
}
pub trait ScalarOps:
fmt::Debug + Clone + PartialOrd + PartialEq + SubFrId + AsPrimF32 + Serialize + Unpin + Send + 'static
fmt::Debug + Clone + PartialOrd + PartialEq + SubFrId + AsPrimF32 + ByteEstimate + Serialize + Unpin + Send + 'static
{
fn scalar_type_name() -> &'static str;
fn zero_b() -> Self;
@@ -74,7 +75,13 @@ pub trait ScalarOps:
}
macro_rules! impl_scalar_ops {
($ty:ident, $zero:expr, $equal_slack:ident, $mac_add:ident, $mac_div:ident, $sty_name:expr) => {
($ty:ident, $zero:expr, $equal_slack:ident, $mac_add:ident, $mac_div:ident, $sty_name:expr, $byte_estimate:expr) => {
impl ByteEstimate for $ty {
fn byte_estimate(&self) -> u64 {
$byte_estimate
}
}
impl ScalarOps for $ty {
fn scalar_type_name() -> &'static str {
$sty_name
@@ -205,15 +212,23 @@ macro_rules! div_string {
};
}
impl_scalar_ops!(u8, 0, equal_int, add_int, div_int, "u8");
impl_scalar_ops!(u16, 0, equal_int, add_int, div_int, "u16");
impl_scalar_ops!(u32, 0, equal_int, add_int, div_int, "u32");
impl_scalar_ops!(u64, 0, equal_int, add_int, div_int, "u64");
impl_scalar_ops!(i8, 0, equal_int, add_int, div_int, "i8");
impl_scalar_ops!(i16, 0, equal_int, add_int, div_int, "i16");
impl_scalar_ops!(i32, 0, equal_int, add_int, div_int, "i32");
impl_scalar_ops!(i64, 0, equal_int, add_int, div_int, "i64");
impl_scalar_ops!(f32, 0., equal_f32, add_int, div_int, "f32");
impl_scalar_ops!(f64, 0., equal_f64, add_int, div_int, "f64");
impl_scalar_ops!(bool, false, equal_bool, add_bool, div_bool, "bool");
impl_scalar_ops!(String, String::new(), equal_string, add_string, div_string, "string");
impl_scalar_ops!(u8, 0, equal_int, add_int, div_int, "u8", 1);
impl_scalar_ops!(u16, 0, equal_int, add_int, div_int, "u16", 2);
impl_scalar_ops!(u32, 0, equal_int, add_int, div_int, "u32", 4);
impl_scalar_ops!(u64, 0, equal_int, add_int, div_int, "u64", 8);
impl_scalar_ops!(i8, 0, equal_int, add_int, div_int, "i8", 1);
impl_scalar_ops!(i16, 0, equal_int, add_int, div_int, "i16", 2);
impl_scalar_ops!(i32, 0, equal_int, add_int, div_int, "i32", 4);
impl_scalar_ops!(i64, 0, equal_int, add_int, div_int, "i64", 8);
impl_scalar_ops!(f32, 0., equal_f32, add_int, div_int, "f32", 4);
impl_scalar_ops!(f64, 0., equal_f64, add_int, div_int, "f64", 8);
impl_scalar_ops!(bool, false, equal_bool, add_bool, div_bool, "bool", 1);
impl_scalar_ops!(
String,
String::new(),
equal_string,
add_string,
div_string,
"string",
16
);

View File

@@ -14,6 +14,7 @@ use items_0::collect_s::CollectableType;
use items_0::collect_s::Collected;
use items_0::collect_s::CollectorType;
use items_0::collect_s::ToJsonResult;
use items_0::container::ByteEstimate;
use items_0::overlap::HasTimestampDeque;
use items_0::scalar_ops::AsPrimF32;
use items_0::scalar_ops::ScalarOps;
@@ -187,6 +188,24 @@ impl<STY> WithLen for BinsDim0<STY> {
}
}
impl<STY: ScalarOps> ByteEstimate for BinsDim0<STY> {
fn byte_estimate(&self) -> u64 {
// TODO
// Should use a better estimate for waveform and string types,
// or keep some aggregated byte count on push.
let n = self.len();
if n == 0 {
0
} else {
// TODO use the actual size of one/some of the elements.
let i = n * 2 / 3;
let w1 = self.mins[i].byte_estimate();
let w2 = self.maxs[i].byte_estimate();
(n as u64 * (8 + 8 + 8 + 4 + w1 + w2)) as u64
}
}
}
impl<STY> Resettable for BinsDim0<STY> {
fn reset(&mut self) {
self.ts1s.clear();
@@ -414,7 +433,13 @@ impl<NTY> BinsDim0Collector<NTY> {
impl<NTY> WithLen for BinsDim0Collector<NTY> {
fn len(&self) -> usize {
self.vals.as_ref().map_or(0, |x| x.ts1s.len())
self.vals.as_ref().map_or(0, WithLen::len)
}
}
impl<STY: ScalarOps> ByteEstimate for BinsDim0Collector<STY> {
fn byte_estimate(&self) -> u64 {
self.vals.as_ref().map_or(0, ByteEstimate::byte_estimate)
}
}

View File

@@ -11,6 +11,7 @@ use items_0::collect_s::CollectableType;
use items_0::collect_s::Collected;
use items_0::collect_s::CollectorType;
use items_0::collect_s::ToJsonResult;
use items_0::container::ByteEstimate;
use items_0::scalar_ops::AsPrimF32;
use items_0::scalar_ops::ScalarOps;
use items_0::timebin::TimeBinnable;
@@ -205,6 +206,24 @@ impl<STY> WithLen for BinsXbinDim0<STY> {
}
}
impl<STY: ScalarOps> ByteEstimate for BinsXbinDim0<STY> {
fn byte_estimate(&self) -> u64 {
// TODO
// Should use a better estimate for waveform and string types,
// or keep some aggregated byte count on push.
let n = self.len();
if n == 0 {
0
} else {
// TODO use the actual size of one/some of the elements.
let i = n * 2 / 3;
let w1 = self.mins[i].byte_estimate();
let w2 = self.maxs[i].byte_estimate();
(n as u64 * (8 + 8 + 8 + 4 + w1 + w2)) as u64
}
}
}
impl<STY> Resettable for BinsXbinDim0<STY> {
fn reset(&mut self) {
self.ts1s.clear();
@@ -401,7 +420,13 @@ impl<NTY> BinsXbinDim0Collector<NTY> {
impl<NTY> WithLen for BinsXbinDim0Collector<NTY> {
fn len(&self) -> usize {
self.vals.ts1s.len()
self.vals.len()
}
}
impl<STY: ScalarOps> ByteEstimate for BinsXbinDim0Collector<STY> {
fn byte_estimate(&self) -> u64 {
self.vals.byte_estimate()
}
}

View File

@@ -1159,6 +1159,12 @@ impl WithLen for ChannelEventsCollector {
}
}
impl ByteEstimate for ChannelEventsCollector {
fn byte_estimate(&self) -> u64 {
self.coll.as_ref().map_or(0, |x| x.byte_estimate())
}
}
impl Collector for ChannelEventsCollector {
fn ingest(&mut self, item: &mut dyn Collectable) {
if let Some(item) = item.as_any_mut().downcast_mut::<ChannelEvents>() {

View File

@@ -164,10 +164,20 @@ impl<STY> WithLen for EventsDim0<STY> {
}
}
impl<STY> ByteEstimate for EventsDim0<STY> {
impl<STY: ScalarOps> ByteEstimate for EventsDim0<STY> {
fn byte_estimate(&self) -> u64 {
let stylen = mem::size_of::<STY>();
(self.len() * (8 + 8 + stylen)) as u64
// TODO
// Should use a better estimate for waveform and string types,
// or keep some aggregated byte count on push.
let n = self.len();
if n == 0 {
0
} else {
// TODO use the actual size of one/some of the elements.
let i = n * 2 / 3;
let sty_bytes = self.values[i].byte_estimate();
(n as u64 * (8 + 8 + sty_bytes)) as u64
}
}
}
@@ -262,7 +272,13 @@ impl<STY> EventsDim0Collector<STY> {
impl<STY> WithLen for EventsDim0Collector<STY> {
fn len(&self) -> usize {
self.vals.tss.len()
WithLen::len(&self.vals)
}
}
impl<STY: ScalarOps> ByteEstimate for EventsDim0Collector<STY> {
fn byte_estimate(&self) -> u64 {
ByteEstimate::byte_estimate(&self.vals)
}
}
@@ -426,7 +442,7 @@ impl<STY: ScalarOps> CollectorType for EventsDim0Collector<STY> {
if let Some(range) = &range {
match range {
SeriesRange::TimeRange(x) => Some(IsoDateTime::from_u64(x.beg + SEC)),
SeriesRange::PulseRange(x) => {
SeriesRange::PulseRange(_) => {
error!("TODO emit create continueAt for pulse range");
Some(IsoDateTime::from_u64(0))
}

View File

@@ -220,7 +220,13 @@ impl<STY> EventsDim1Collector<STY> {
impl<STY> WithLen for EventsDim1Collector<STY> {
fn len(&self) -> usize {
self.vals.tss.len()
WithLen::len(&self.vals)
}
}
impl<STY> ByteEstimate for EventsDim1Collector<STY> {
fn byte_estimate(&self) -> u64 {
ByteEstimate::byte_estimate(&self.vals)
}
}

View File

@@ -962,7 +962,13 @@ impl<NTY> EventsXbinDim0Collector<NTY> {
impl<NTY> WithLen for EventsXbinDim0Collector<NTY> {
fn len(&self) -> usize {
self.vals.tss.len()
WithLen::len(&self.vals)
}
}
impl<STY> ByteEstimate for EventsXbinDim0Collector<STY> {
fn byte_estimate(&self) -> u64 {
ByteEstimate::byte_estimate(&self.vals)
}
}

View File

@@ -27,15 +27,17 @@ pub struct PlainEventsQuery {
range: SeriesRange,
#[serde(default, skip_serializing_if = "is_false", rename = "oneBeforeRange")]
one_before_range: bool,
#[serde(
default = "TransformQuery::default_events",
skip_serializing_if = "TransformQuery::is_default_events"
)]
#[serde(default = "TransformQuery::default_events")]
#[serde(skip_serializing_if = "TransformQuery::is_default_events")]
transform: TransformQuery,
#[serde(default, skip_serializing_if = "Option::is_none", with = "humantime_serde")]
timeout: Option<Duration>,
#[serde(default, skip_serializing_if = "Option::is_none")]
events_max: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
bytes_max: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
allow_large_result: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none", with = "humantime_serde")]
event_delay: Option<Duration>,
#[serde(default, skip_serializing_if = "Option::is_none")]
@@ -65,7 +67,9 @@ impl PlainEventsQuery {
one_before_range: false,
transform: TransformQuery::default_events(),
timeout: Some(Duration::from_millis(4000)),
events_max: Some(10000),
events_max: None,
bytes_max: None,
allow_large_result: None,
event_delay: None,
stream_batch_len: None,
buf_len_disk_io: None,
@@ -102,13 +106,33 @@ impl PlainEventsQuery {
}
pub fn events_max(&self) -> u64 {
self.events_max.unwrap_or(1024 * 128)
self.events_max.map_or_else(
|| {
if self.allow_large_result_or_def() {
1000 * 500
} else {
1000 * 80
}
},
|x| x.min(1000 * 1000 * 4),
)
}
// A rough indication on how many bytes this request is allowed to return. Otherwise, the result should
// be a partial result.
pub fn bytes_max(&self) -> u64 {
self.events_max.unwrap_or(1024 * 512)
self.bytes_max.map_or_else(
|| {
if self.allow_large_result_or_def() {
1024 * 1024 * 80
} else {
1024 * 1024 * 8
}
},
|x| x.min(1024 * 1024 * 400),
)
}
pub fn allow_large_result_or_def(&self) -> bool {
self.allow_large_result.unwrap_or(false)
}
pub fn event_delay(&self) -> &Option<Duration> {
@@ -209,9 +233,9 @@ impl FromUrl for PlainEventsQuery {
.get("timeout")
.map(|x| x.parse::<u64>().map(Duration::from_millis).ok())
.unwrap_or(None),
events_max: pairs
.get("eventsMax")
.map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?,
events_max: pairs.get("eventsMax").map_or(None, |k| k.parse().ok()),
bytes_max: pairs.get("bytesMax").map_or(None, |k| k.parse().ok()),
allow_large_result: pairs.get("allowLargeResult").map_or(None, |x| x.parse().ok()),
event_delay: pairs.get("eventDelay").map_or(Ok(None), |k| {
k.parse::<u64>().map(|x| Duration::from_millis(x)).map(|k| Some(k))
})?,
@@ -265,19 +289,25 @@ impl AppendToUrl for PlainEventsQuery {
self.transform.append_to_url(url);
let mut g = url.query_pairs_mut();
if let Some(x) = &self.timeout {
g.append_pair("timeout", &format!("{}", x.as_millis()));
g.append_pair("timeout", &format!("{:.0}", x.as_secs_f64() * 1e3));
}
if let Some(x) = self.events_max.as_ref() {
g.append_pair("eventsMax", &format!("{}", x));
g.append_pair("eventsMax", &x.to_string());
}
if let Some(x) = self.bytes_max.as_ref() {
g.append_pair("bytesMax", &x.to_string());
}
if let Some(x) = self.allow_large_result {
g.append_pair("allowLargeResult", &x.to_string());
}
if let Some(x) = self.event_delay.as_ref() {
g.append_pair("eventDelay", &format!("{:.0}", x.as_secs_f64() * 1e3));
}
if let Some(x) = self.stream_batch_len.as_ref() {
g.append_pair("streamBatchLen", &format!("{}", x));
g.append_pair("streamBatchLen", &x.to_string());
}
if let Some(x) = self.buf_len_disk_io.as_ref() {
g.append_pair("bufLenDiskIo", &format!("{}", x));
g.append_pair("bufLenDiskIo", &x.to_string());
}
if self.do_test_main_error {
g.append_pair("doTestMainError", "true");
@@ -289,7 +319,7 @@ impl AppendToUrl for PlainEventsQuery {
g.append_pair("testDoWasm", &x);
}
if let Some(x) = self.merger_out_len_max.as_ref() {
g.append_pair("mergerOutLenMax", &format!("{}", x));
g.append_pair("mergerOutLenMax", &x.to_string());
}
if self.create_errors.len() != 0 {
g.append_pair("create_errors", &self.create_errors.join(","));
@@ -331,6 +361,7 @@ impl EventsSubQuerySelect {
pub struct EventsSubQuerySettings {
timeout: Option<Duration>,
events_max: Option<u64>,
bytes_max: Option<u64>,
event_delay: Option<Duration>,
stream_batch_len: Option<usize>,
buf_len_disk_io: Option<usize>,
@@ -343,6 +374,7 @@ impl Default for EventsSubQuerySettings {
Self {
timeout: None,
events_max: None,
bytes_max: None,
event_delay: None,
stream_batch_len: None,
buf_len_disk_io: None,
@@ -357,6 +389,7 @@ impl From<&PlainEventsQuery> for EventsSubQuerySettings {
Self {
timeout: value.timeout,
events_max: value.events_max,
bytes_max: value.bytes_max,
event_delay: value.event_delay,
stream_batch_len: value.stream_batch_len,
buf_len_disk_io: value.buf_len_disk_io,
@@ -373,6 +406,7 @@ impl From<&BinnedQuery> for EventsSubQuerySettings {
timeout: value.timeout(),
// TODO ?
events_max: None,
bytes_max: None,
event_delay: None,
stream_batch_len: None,
buf_len_disk_io: None,
@@ -390,6 +424,7 @@ impl From<&Api1Query> for EventsSubQuerySettings {
timeout: value.timeout(),
// TODO ?
events_max: None,
bytes_max: None,
event_delay: None,
stream_batch_len: None,
buf_len_disk_io: Some(disk_io_tune.read_buffer_len),

View File

@@ -44,6 +44,7 @@ macro_rules! trace4 {
pub struct Collect {
inp: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Collectable>>> + Send>>,
events_max: u64,
bytes_max: u64,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
collector: Option<Box<dyn Collector>>,
@@ -58,6 +59,7 @@ impl Collect {
inp: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Collectable>>> + Send>>,
deadline: Instant,
events_max: u64,
bytes_max: u64,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Self {
@@ -65,6 +67,7 @@ impl Collect {
Self {
inp,
events_max,
bytes_max,
range,
binrange,
collector: None,
@@ -93,7 +96,12 @@ impl Collect {
let coll = self.collector.get_or_insert_with(|| item.new_collector());
coll.ingest(&mut item);
if coll.len() as u64 >= self.events_max {
info!("reached events_max {}", self.events_max);
info!("reached events_max {} / {}", coll.len(), self.events_max);
coll.set_continue_at_here();
self.done_input = true;
}
if coll.byte_estimate() >= self.bytes_max {
info!("reached bytes_max {} / {}", coll.byte_estimate(), self.events_max);
coll.set_continue_at_here();
self.done_input = true;
}

View File

@@ -37,7 +37,15 @@ pub async fn plain_events_json(
//let stream = TimeBinnableToCollectable::new(stream);
let stream = Box::pin(stream);
info!("plain_events_json boxed stream created");
let collected = Collect::new(stream, deadline, evq.events_max(), Some(evq.range().clone()), None).await?;
let collected = Collect::new(
stream,
deadline,
evq.events_max(),
evq.bytes_max(),
Some(evq.range().clone()),
None,
)
.await?;
info!("plain_events_json collected");
let jsval = serde_json::to_value(&collected)?;
info!("plain_events_json json serialized");

View File

@@ -63,11 +63,12 @@ fn collect_channel_events_01() -> Result<(), Error> {
// TODO build like in request code
let deadline = Instant::now() + Duration::from_millis(4000);
let events_max = 10000;
let bytes_max = 80 * 10000;
let stream = PlainEventStream::new(stream);
let stream = EventsToTimeBinnable::new(stream);
let stream = TimeBinnableToCollectable::new(stream);
let stream = Box::pin(stream);
let res = Collect::new(stream, deadline, events_max, None, None).await?;
let res = Collect::new(stream, deadline, events_max, bytes_max, None, None).await?;
if let Some(res) = res.as_any_ref().downcast_ref::<EventsDim0CollectorOutput<f32>>() {
eprintln!("Great, a match");
eprintln!("{res:?}");
@@ -103,11 +104,12 @@ fn collect_channel_events_pulse_id_diff() -> Result<(), Error> {
let stream = EventsToTimeBinnable::new(stream);
let deadline = Instant::now() + Duration::from_millis(4000);
let events_max = 10000;
let bytes_max = 80 * 10000;
let stream = Box::pin(stream);
let stream = build_time_binning_transform(&trqu, stream)?;
let stream = TimeBinnableToCollectable::new(stream);
let stream = Box::pin(stream);
let res = Collect::new(stream, deadline, events_max, None, None).await?;
let res = Collect::new(stream, deadline, events_max, bytes_max, None, None).await?;
if let Some(res) = res.as_any_ref().downcast_ref::<EventsDim0CollectorOutput<i64>>() {
eprintln!("Great, a match");
eprintln!("{res:?}");

View File

@@ -256,10 +256,12 @@ pub async fn timebinned_json(
) -> Result<JsonValue, Error> {
let deadline = Instant::now().checked_add(query.timeout_value()).unwrap();
let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?;
// TODO derive better values, from query
let collect_max = 10000;
let bytes_max = 100 * collect_max;
let stream = timebinned_stream(query.clone(), binned_range.clone(), ch_conf, ctx, open_bytes).await?;
let stream = timebinned_to_collectable(stream);
let collected = Collect::new(stream, deadline, collect_max, None, Some(binned_range));
let collected = Collect::new(stream, deadline, collect_max, bytes_max, None, Some(binned_range));
let collected: BoxFuture<_> = Box::pin(collected);
let collected = collected.await?;
let jsval = serde_json::to_value(&collected)?;