From ab6b0322c98a9bf17ed28f825983cc9f23ad391e Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 13 Sep 2024 19:21:27 +0200 Subject: [PATCH] WIP --- apidoc/src/bins.md | 52 ++++++++++++++++--- apidoc/src/events.md | 3 +- crates/httpret/src/api4/events.rs | 6 +-- crates/items_2/src/binsdim0.rs | 21 ++++++++ crates/netpod/src/netpod.rs | 75 ++++++++++++++++++++++------ crates/query/src/api4/binned.rs | 57 +++++++++++++++------ crates/scyllaconn/src/bincache.rs | 2 +- crates/streams/src/timebinnedjson.rs | 63 ++++++++++++++--------- 8 files changed, 214 insertions(+), 65 deletions(-) diff --git a/apidoc/src/bins.md b/apidoc/src/bins.md index 56bd57a..f7f893d 100644 --- a/apidoc/src/bins.md +++ b/apidoc/src/bins.md @@ -3,15 +3,53 @@ Binned data can be fetched like this: ```bash -curl "https://data-api.psi.ch/api/4/binned?backend=sf-databuffer&channelName=S10BC01-DBPM010:Q1&begDate=2024-02-15T00:00:00Z&endDate=2024-02-15T12:00:00Z&binCount=500" +curl "https://data-api.psi.ch/api/4/binned?backend=sf-databuffer&channelName=S10BC01-DBPM010:Q1&begDate=2024-02-15T00:00:00Z&endDate=2024-02-15T12:00:00Z&binWidth=" ``` +Parameters: +- `backend`: the backend that the channel exists in, e.g. `sf-databuffer`. +- `channelName`: the name of the channel. +- `begDate`: start of the time range, inclusive. In ISO format e.g. `2024-02-15T12:41:00Z`. +- `endDate`: end of the time range, exclusive. +- `binWidth`: requested width of the bins, given with a unit suffix e.g. `10s` for 10 seconds, `2m` for 2 minutes, + `1h` for 1 hour. +- `binCount`: requested number of bins, can not be combined with `binWidth`. +- `contentTimeout`: return the so-far computed results after the given timeout. + The streaming (e.g. `json-framed`) response will yield results in `contentTimeout` intervals. +- `allowLargeResult=true` **DEPRECATED, will be rejected in the future** + indicates that the client is prepared to accept also larger responses compared to + what might be suitable for a typical browser. Please download large result sets as + framed json or framed cbor streams, see below. + This returns for each bin the average, minimum, maximum and count of events. -Note: the server may return more than `binCount` bins. -That is because most of the time, the requested combination of date range and bin count -does not fit well on the common time grid, which is required for caching to work. +Note: it is an error to specify both `binWidth` and `binCount`. +The server may return more than `binCount` bins, and it will choose a `binWidth` from a set of +supported widths to best possibly match the requested width. -If absolutely required, we could re-crunch the numbers to calculate the exact -requested specification of date range and bin count. Please get in touch -if your use case demands this. + +## As framed JSON stream + +To download larger amounts data as JSON it is recommended to use the `json-framed` content encoding. +Using this encoding, the server can send the requested events as a stream of json objects, where each +json object contains a batch of events. +This content encoding is triggered via the `Accept: application/json-framed` header in the request. + +The returned body looks like: +``` +[JSON-frame] +[JSON-frame] +[JSON-frame] +... etc +``` + +where each `[JSON-frame]` looks like: +``` +[number of bytes N of the following json-encoded data, as ASCII-encoded number] +[newline] +[JSON object: N bytes] +[newline] +``` + +Note: "data" objects are currently identified by the presence of the `ts1s` key. +There can be other types of objects, like keepalive, log or statistics. diff --git a/apidoc/src/events.md b/apidoc/src/events.md index 3f34b75..0906520 100644 --- a/apidoc/src/events.md +++ b/apidoc/src/events.md @@ -12,7 +12,8 @@ Parameters: - `begDate`: start of the time range, inclusive. In ISO format e.g. `2024-02-15T12:41:00Z`. - `endDate`: end of the time range, exclusive. - `oneBeforeRange`: if set to `true` the reponse will in addition also contain the most recent event before the given range. -- `allowLargeResult=true` **DEPRECATED**. indicates that the client is prepared to accept also larger responses compared to +- `allowLargeResult=true` **DEPRECATED, will be rejected in the future** + indicates that the client is prepared to accept also larger responses compared to what might be suitable for a typical browser. Please download large result sets as framed json or framed cbor streams, see below. diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index 9a552ac..6c20a36 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -288,12 +288,12 @@ where use std::fmt::Write; let s = y.into(); let mut b2 = String::with_capacity(16); - write!(b2, "\n{}\n", s.len()).unwrap(); - stream::iter([Ok::<_, crate::err::Error>(b2), Ok(s)]) + write!(b2, "{:15}\n", s.len()).unwrap(); + stream::iter([Ok::<_, crate::err::Error>(b2), Ok(s), Ok(String::from("\n"))]) } Err(e) => { let e = crate::err::Error::with_msg_no_trace(e.to_string()); - stream::iter([Err(e), Ok(String::new())]) + stream::iter([Err(e), Ok(String::new()), Ok(String::new())]) } }) .filter(|x| if let Ok(x) = x { ready(x.len() > 0) } else { ready(true) }) diff --git a/crates/items_2/src/binsdim0.rs b/crates/items_2/src/binsdim0.rs index 38ec0cc..64e0dc1 100644 --- a/crates/items_2/src/binsdim0.rs +++ b/crates/items_2/src/binsdim0.rs @@ -361,6 +361,8 @@ where min: STY, max: STY, avg: f64, + filled_up_to: TsNano, + last_seen_avg: f32, } impl BinsDim0TimeBinnerTy @@ -385,6 +387,8 @@ where min: STY::zero_b(), max: STY::zero_b(), avg: 0., + filled_up_to: ts1now, + last_seen_avg: 0., } } @@ -467,6 +471,8 @@ where panic!("TODO non-time-weighted binning to be impl"); } } + self.filled_up_to = TsNano::from_ns(ts2); + self.last_seen_avg = avg; } } if count_before != 0 { @@ -496,6 +502,21 @@ where } fn push_in_progress(&mut self, push_empty: bool) { + if self.filled_up_to != self.ts2now { + if self.cnt != 0 { + info!("push_in_progress partially filled bin"); + if self.do_time_weight { + let f = (self.ts2now.ns() - self.filled_up_to.ns()) as f64 + / (self.ts2now.ns() - self.ts1now.ns()) as f64; + self.avg += self.last_seen_avg as f64 * f; + self.filled_up_to = self.ts2now; + } else { + panic!("TODO non-time-weighted binning to be impl"); + } + } else { + error!("partially filled bin with cnt 0"); + } + } if self.cnt == 0 && !push_empty { self.reset_agg(); } else { diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 5944259..8fc76d3 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -1748,6 +1748,10 @@ mod dt_nano_serde { pub struct DtMs(u64); impl DtMs { + pub const fn from_nano_u64(x: u64) -> Self { + Self(x / 1000000) + } + pub const fn from_ms_u64(x: u64) -> Self { Self(x) } @@ -2108,20 +2112,7 @@ const PATCH_T_LEN_OPTIONS_WAVE: [u64; 3] = [ DAY * 32, ]; -const TIME_BIN_THRESHOLDS: [u64; 39] = [ - MU, - MU * 2, - MU * 5, - MU * 10, - MU * 20, - MU * 50, - MU * 100, - MU * 200, - MU * 500, - MS, - MS * 2, - MS * 5, - MS * 10, +const TIME_BIN_THRESHOLDS: [u64; 26] = [ MS * 20, MS * 50, MS * 100, @@ -2415,6 +2406,52 @@ impl BinnedRange { } } + pub fn covering_range_time(range: NanoRange, bin_len_req: DtMs) -> Result { + let opts = ::binned_bin_len_opts(); + let bin_len_req = if bin_len_req.ms() < opts[0].ms() { + DtMs::from_ms_u64(opts[0].ms()) + } else { + bin_len_req + }; + let bin_len_req = if bin_len_req.ms() > opts.last().unwrap().ms() { + DtMs::from_ms_u64(opts.last().unwrap().ms()) + } else { + bin_len_req + }; + let pv = TsNano::from_ns(bin_len_req.ns()); + let pi = opts.partition_point(|&x| x < pv); + let bin_len = if pi == 0 { + DtMs::from_ms_u64(opts[0].ms()) + } else { + let v1 = DtMs::from_ms_u64(opts[pi - 1].ms()); + if let Some(&v2) = opts.get(pi) { + let v2 = DtMs::from_ms_u64(v2.ms()); + if v1 >= bin_len_req || v2 < bin_len_req { + panic!("logic covering_range_time"); + } else { + let f1 = (bin_len_req.ms() - v1.ms()) / bin_len_req.ms(); + let f2 = (v2.ms() - bin_len_req.ms()) / bin_len_req.ms(); + if f1 < f2 { + v1 + } else { + v2 + } + } + } else { + DtMs::from_ms_u64(v1.ms()) + } + }; + let bin_off = range.beg() / bin_len.ns(); + let off2 = (range.end() + bin_len.ns() - 1) / bin_len.ns(); + let bin_cnt = off2 - bin_off; + let ret = Self { + bin_len: TsNano::from_ns(bin_len.ns()), + bin_off, + bin_cnt, + }; + Ok(ret) + } + pub fn nano_beg(&self) -> TsNano { self.bin_len.times(self.bin_off) } @@ -2512,6 +2549,16 @@ impl BinnedRangeEnum { Err(Error::with_msg_no_trace("can not find matching pre-binned grid")) } + /// Cover at least the given range while selecting the bin width which best fits the requested bin width. + pub fn covering_range_time(range: SeriesRange, bin_len_req: DtMs) -> Result { + match range { + SeriesRange::TimeRange(k) => Ok(Self::Time(BinnedRange::covering_range_time(k, bin_len_req)?)), + SeriesRange::PulseRange(_) => Err(Error::with_msg_no_trace(format!( + "timelike bin width not possible for a pulse range" + ))), + } + } + /// Cover at least the given range with at least as many as the requested number of bins. pub fn covering_range(range: SeriesRange, min_bin_count: u32) -> Result { match range { diff --git a/crates/query/src/api4/binned.rs b/crates/query/src/api4/binned.rs index 5d04955..a637852 100644 --- a/crates/query/src/api4/binned.rs +++ b/crates/query/src/api4/binned.rs @@ -6,7 +6,10 @@ use netpod::query::CacheUsage; use netpod::range::evrange::SeriesRange; use netpod::ttl::RetentionTime; use netpod::AppendToUrl; +use netpod::BinnedRange; +use netpod::BinnedRangeEnum; use netpod::ByteSize; +use netpod::DtMs; use netpod::FromUrl; use netpod::HasBackend; use netpod::HasTimeout; @@ -57,7 +60,8 @@ mod serde_option_vec_duration { pub struct BinnedQuery { channel: SfDbChannel, range: SeriesRange, - bin_count: u32, + #[serde(default, skip_serializing_if = "Option::is_none")] + bin_count: Option, #[serde(default, skip_serializing_if = "Option::is_none", with = "humantime_serde")] bin_width: Option, #[serde( @@ -67,8 +71,6 @@ pub struct BinnedQuery { transform: TransformQuery, #[serde(default, skip_serializing_if = "Option::is_none")] cache_usage: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - bins_max: Option, #[serde(default, skip_serializing_if = "Option::is_none", with = "serde_option_vec_duration")] subgrids: Option>, #[serde( @@ -97,11 +99,10 @@ impl BinnedQuery { Self { channel, range, - bin_count, + bin_count: Some(bin_count), bin_width: None, transform: TransformQuery::default_time_binned(), cache_usage: None, - bins_max: None, subgrids: None, buf_len_disk_io: None, disk_stats_every: None, @@ -121,10 +122,14 @@ impl BinnedQuery { &self.channel } - pub fn bin_count(&self) -> u32 { + pub fn bin_count(&self) -> Option { self.bin_count } + pub fn bin_width(&self) -> Option { + self.bin_width + } + pub fn transform(&self) -> &TransformQuery { &self.transform } @@ -151,10 +156,6 @@ impl BinnedQuery { self.timeout_content } - pub fn bins_max(&self) -> u32 { - self.bins_max.unwrap_or(200000) - } - pub fn subgrids(&self) -> Option<&[Duration]> { self.subgrids.as_ref().map(|x| x.as_slice()) } @@ -208,6 +209,32 @@ impl BinnedQuery { pub fn use_rt(&self) -> Option { self.use_rt.clone() } + + pub fn covering_range(&self) -> Result { + match &self.range { + SeriesRange::TimeRange(range) => match self.bin_width { + Some(dt) => { + if self.bin_count.is_some() { + Err(Error::with_public_msg_no_trace(format!( + "must not specify both binWidth and binCount" + ))) + } else { + let ret = BinnedRangeEnum::Time(BinnedRange::covering_range_time( + range.clone(), + DtMs::from_ms_u64(dt.as_millis() as u64), + )?); + Ok(ret) + } + } + None => { + let bc = self.bin_count.unwrap_or(20); + let ret = BinnedRangeEnum::covering_range(self.range.clone(), bc)?; + Ok(ret) + } + }, + SeriesRange::PulseRange(_) => todo!(), + } + } } impl HasBackend for BinnedQuery { @@ -232,7 +259,7 @@ impl FromUrl for BinnedQuery { let ret = Self { channel: SfDbChannel::from_pairs(&pairs)?, range: SeriesRange::from_pairs(pairs)?, - bin_count: pairs.get("binCount").and_then(|x| x.parse().ok()).unwrap_or(10), + bin_count: pairs.get("binCount").and_then(|x| x.parse().ok()), bin_width: pairs.get("binWidth").and_then(|x| humantime::parse_duration(x).ok()), transform: TransformQuery::from_pairs(pairs)?, cache_usage: CacheUsage::from_pairs(&pairs)?, @@ -252,7 +279,6 @@ impl FromUrl for BinnedQuery { timeout_content: pairs .get("contentTimeout") .and_then(|x| humantime::parse_duration(x).ok()), - bins_max: pairs.get("binsMax").map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, subgrids: pairs .get("subgrids") .map(|x| x.split(",").filter_map(|x| humantime::parse_duration(x).ok()).collect()), @@ -278,7 +304,9 @@ impl AppendToUrl for BinnedQuery { self.range.append_to_url(url); { let mut g = url.query_pairs_mut(); - g.append_pair("binCount", &format!("{}", self.bin_count)); + if let Some(x) = self.bin_count { + g.append_pair("binCount", &format!("{}", x)); + } if let Some(x) = self.bin_width { if x < Duration::from_secs(1) { g.append_pair("binWidth", &format!("{:.0}ms", x.subsec_millis())); @@ -301,9 +329,6 @@ impl AppendToUrl for BinnedQuery { if let Some(x) = &self.timeout_content { g.append_pair("contentTimeout", &format!("{:.0}ms", 1e3 * x.as_secs_f64())); } - if let Some(x) = self.bins_max { - g.append_pair("binsMax", &format!("{}", x)); - } if let Some(x) = &self.subgrids { let s: String = x.iter() diff --git a/crates/scyllaconn/src/bincache.rs b/crates/scyllaconn/src/bincache.rs index b68456f..38bbbd0 100644 --- a/crates/scyllaconn/src/bincache.rs +++ b/crates/scyllaconn/src/bincache.rs @@ -268,7 +268,7 @@ pub async fn worker_write( max, avg, ); - eprintln!("cache write {:?}", params); + // trace!("cache write {:?}", params); scy.execute(stmts_cache.st_write_f32(), params) .await .map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?; diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index 265fbbf..ddd6756 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -416,8 +416,13 @@ pub async fn timebinned_json( cache_read_provider: Option>, events_read_provider: Option>, ) -> Result { - let deadline = Instant::now() + query.timeout_content().unwrap_or(Duration::from_millis(5000)); - let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?; + let deadline = Instant::now() + + query + .timeout_content() + .unwrap_or(Duration::from_millis(5000)) + .min(Duration::from_millis(5000)) + .max(Duration::from_millis(200)); + let binned_range = query.covering_range()?; // TODO derive better values, from query let collect_max = 10000; let bytes_max = 100 * collect_max; @@ -479,7 +484,7 @@ pub async fn timebinned_json_framed( events_read_provider: Option>, ) -> Result { trace!("timebinned_json_framed"); - let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?; + let binned_range = query.covering_range()?; // TODO derive better values, from query let stream = timebinned_stream( query.clone(), @@ -492,11 +497,18 @@ pub async fn timebinned_json_framed( ) .await?; let stream = timebinned_to_collectable(stream); - + // TODO create a custom Stream adapter. + // Want to timeout only on data items: the user wants to wait for bins only a maximum time. + // But also, I want to coalesce. + let timeout_content_base = query + .timeout_content() + .unwrap_or(Duration::from_millis(1000)) + .min(Duration::from_millis(5000)) + .max(Duration::from_millis(100)); + let timeout_content_2 = timeout_content_base * 2 / 3; let mut coll = None; - let interval = tokio::time::interval(Duration::from( - query.timeout_content().unwrap_or(Duration::from_millis(1000)), - )); + let interval = tokio::time::interval(Duration::from(timeout_content_base)); + let mut last_emit = Instant::now(); let stream = stream.map(|x| Some(x)).chain(futures_util::stream::iter([None])); let stream = tokio_stream::StreamExt::timeout_repeating(stream, interval).map(move |x| match x { Ok(item) => match item { @@ -506,29 +518,37 @@ pub async fn timebinned_json_framed( RangeCompletableItem::Data(mut item) => { let coll = coll.get_or_insert_with(|| item.new_collector()); coll.ingest(&mut item); - if coll.len() >= 128 { - take_collector_result(coll) + if coll.len() >= 128 || last_emit.elapsed() >= timeout_content_2 { + last_emit = Instant::now(); + take_collector_result(coll).map(|x| Ok(x)) } else { + // Some(serde_json::Value::String(format!("coll len {}", coll.len()))) None } } RangeCompletableItem::RangeComplete => None, }, StreamItem::Log(x) => { - info!("{x:?}"); + debug!("{x:?}"); + // Some(serde_json::Value::String(format!("{x:?}"))) None } StreamItem::Stats(x) => { - info!("{x:?}"); + debug!("{x:?}"); + // Some(serde_json::Value::String(format!("{x:?}"))) None } }, - Err(e) => Some(serde_json::Value::String(format!("{e}"))), + Err(e) => Some(Err(e)), }, None => { if let Some(coll) = coll.as_mut() { - take_collector_result(coll) + last_emit = Instant::now(); + take_collector_result(coll).map(|x| Ok(x)) } else { + // Some(serde_json::Value::String(format!( + // "end of input but no collector to take something from" + // ))) None } } @@ -536,26 +556,23 @@ pub async fn timebinned_json_framed( Err(_) => { if let Some(coll) = coll.as_mut() { if coll.len() != 0 { - take_collector_result(coll) + last_emit = Instant::now(); + take_collector_result(coll).map(|x| Ok(x)) } else { + // Some(serde_json::Value::String(format!("timeout but nothing to do"))) None } } else { + // Some(serde_json::Value::String(format!("timeout but no collector"))) None } } }); + let stream = stream.filter_map(|x| futures_util::future::ready(x)); // TODO skip the intermediate conversion to js value, go directly to string data let stream = stream.map(|x| match x { - Some(x) => Some(JsonBytes::new(serde_json::to_string(&x).unwrap())), - None => None, + Ok(x) => Ok(JsonBytes::new(serde_json::to_string(&x).unwrap())), + Err(e) => Err(e), }); - let stream = stream.filter_map(|x| futures_util::future::ready(x)); - let stream = stream.map(|x| Ok(x)); - - // let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?; - // let stream = events_stream_to_json_stream(stream); - // let stream = non_empty(stream); - // let stream = only_first_err(stream); Ok(Box::pin(stream)) }