diff --git a/apidoc/src/backends.md b/apidoc/src/backends.md index ba74b3c..4375f45 100644 --- a/apidoc/src/backends.md +++ b/apidoc/src/backends.md @@ -5,3 +5,14 @@ The list of available backends can be queried: ```bash curl "https://data-api.psi.ch/api/4/backend/list" ``` + +Example output: +```json +{ + "backends_available": [ + {"name": "sf-archiver"}, + {"name": "sls-archiver"}, + {"name": "sf-databuffer"} + ] +} +``` diff --git a/apidoc/src/bins.md b/apidoc/src/bins.md index f7f893d..189fa89 100644 --- a/apidoc/src/bins.md +++ b/apidoc/src/bins.md @@ -28,6 +28,62 @@ The server may return more than `binCount` bins, and it will choose a `binWidth` supported widths to best possibly match the requested width. +Example response, truncated to 4 bins: +```json +{ + "avgs": [ + 0.010802877135574818, + 0.010565019212663174, + 0.01061472948640585, + 0.010656529106199741 + ], + "counts": [ + 3000, + 2999, + 3000, + 3000 + ], + "maxs": [ + 0.017492100596427917, + 0.016716860234737396, + 0.01769270747900009, + 0.01670699194073677 + ], + "mins": [ + 0.005227561108767986, + 0.0040797283872962, + 0.004329073242843151, + 0.004934651777148247 + ], + "ts1Ms": [ + 0, + 300000, + 600000, + 900000 + ], + "ts1Ns": [ + 0, + 0, + 0, + 0 + ], + "ts2Ms": [ + 300000, + 600000, + 900000, + 1200000 + ], + "ts2Ns": [ + 0, + 0, + 0, + 0 + ], + "tsAnchor": 1726394700 +} +``` + + ## As framed JSON stream To download larger amounts data as JSON it is recommended to use the `json-framed` content encoding. diff --git a/apidoc/src/events.md b/apidoc/src/events.md index 0906520..47e79af 100644 --- a/apidoc/src/events.md +++ b/apidoc/src/events.md @@ -9,6 +9,9 @@ curl "https://data-api.psi.ch/api/4/events?backend=sf-databuffer&channelName=S10 Parameters: - `backend`: the backend that the channel exists in, e.g. `sf-databuffer`. - `channelName`: the name of the channel. +- `seriesId`: instead of a channel name, can specify the (unique within backend) series id + as returned by channel search. + Note that it is an error to provide both `channelName` and `seriesId`. - `begDate`: start of the time range, inclusive. In ISO format e.g. `2024-02-15T12:41:00Z`. - `endDate`: end of the time range, exclusive. - `oneBeforeRange`: if set to `true` the reponse will in addition also contain the most recent event before the given range. diff --git a/apidoc/src/search.md b/apidoc/src/search.md index f39d318..dc28ffe 100644 --- a/apidoc/src/search.md +++ b/apidoc/src/search.md @@ -4,8 +4,42 @@ To search for e.g. DBPM channels in `sf-databuffer` that end in `:Q1` the reques looks like this: ```bash -curl "https://data-api.psi.ch/api/4/search/channel?backend=sf-databuffer&nameRegex=DBPM.*Q1$" +curl "https://data-api.psi.ch/api/4/search/channel?backend=sf-archiver&nameRegex=S10CB08-KBOC-HP.*PI-OUT" ``` Parameters: - `icase=true` uses case-insensitive search (default: case-sensitive). + + +Example response: +```json +{ + "channels": [ + { + "backend": "sf-archiver", + "name": "S10CB08-KBOC-HPPI1:PI-OUT", + "seriesId": 6173407188734815544, + "source": "", + "type": "f64", + "shape": [], + "unit": "", + "description": "" + } + ] +} +``` + +The response contains a list of matching channels. +For each channel, the search returns: + +- `backend` +- `name` +- `seriesId`: a 63 bit id which identifies the combination of name, type and shape + in a unique way within a backend. Therefore, if the type of a channel changes, it will + also generate an additional new `seriesId`. +- `type`: the scalar data type, for example `i16`, `u64`, `f32`, `enum`, `string`, `bool`. +- `shape`: a `[]` means scalar, a `[512]` indicates a 512 element waveform. + +Note that "source", "unit" and "description" are in the return value for historical reasons. +Unfortunately, the data is often not available, and in general may also change over time. +Therefore, "source", "unit" and "description" are deprecated and will get removed. diff --git a/crates/dbconn/src/dbconn.rs b/crates/dbconn/src/dbconn.rs index 3b9e41f..d2af695 100644 --- a/crates/dbconn/src/dbconn.rs +++ b/crates/dbconn/src/dbconn.rs @@ -65,7 +65,7 @@ pub async fn delay_io_medium() { } pub async fn create_connection(db_config: &Database) -> Result<(PgClient, JoinHandle>), Error> { - warn!("create_connection\n\n CREATING POSTGRES CONNECTION\n\n"); + warn!("create_connection creating postgres connection"); // TODO use a common already running worker pool for these queries: let d = db_config; let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name); diff --git a/crates/items_0/src/scalar_ops.rs b/crates/items_0/src/scalar_ops.rs index 95a92af..1886577 100644 --- a/crates/items_0/src/scalar_ops.rs +++ b/crates/items_0/src/scalar_ops.rs @@ -64,7 +64,18 @@ impl AsPrimF32 for String { } pub trait ScalarOps: - fmt::Debug + Clone + PartialOrd + PartialEq + SubFrId + AsPrimF32 + ByteEstimate + Serialize + Unpin + Send + 'static + fmt::Debug + + fmt::Display + + Clone + + PartialOrd + + PartialEq + + SubFrId + + AsPrimF32 + + ByteEstimate + + Serialize + + Unpin + + Send + + 'static { fn scalar_type_name() -> &'static str; fn zero_b() -> Self; diff --git a/crates/items_0/src/timebin.rs b/crates/items_0/src/timebin.rs index 1d49f1e..b518412 100644 --- a/crates/items_0/src/timebin.rs +++ b/crates/items_0/src/timebin.rs @@ -191,18 +191,17 @@ impl TimeBinnable for Box { } } -#[allow(unused)] impl RangeOverlapInfo for Box { fn ends_before(&self, range: &SeriesRange) -> bool { - todo!() + RangeOverlapInfo::ends_before(self.as_ref(), range) } fn ends_after(&self, range: &SeriesRange) -> bool { - todo!() + RangeOverlapInfo::ends_after(self.as_ref(), range) } fn starts_after(&self, range: &SeriesRange) -> bool { - todo!() + RangeOverlapInfo::starts_after(self.as_ref(), range) } } diff --git a/crates/items_2/src/binsdim0.rs b/crates/items_2/src/binsdim0.rs index 64e0dc1..d66deb6 100644 --- a/crates/items_2/src/binsdim0.rs +++ b/crates/items_2/src/binsdim0.rs @@ -52,7 +52,7 @@ use std::mem; use std::ops::Range; #[allow(unused)] -macro_rules! trace4 { +macro_rules! trace44 { ($($arg:tt)*) => (); ($($arg:tt)*) => (eprintln!($($arg)*)); } @@ -62,10 +62,11 @@ macro_rules! trace4 { pub struct BinsDim0 { pub ts1s: VecDeque, pub ts2s: VecDeque, - pub counts: VecDeque, + pub cnts: VecDeque, pub mins: VecDeque, pub maxs: VecDeque, pub avgs: VecDeque, + pub lsts: VecDeque, pub dim0kind: Option, } @@ -88,7 +89,7 @@ where self.ts1s.len(), self.ts1s.iter().map(|k| k / SEC).collect::>(), self.ts2s.iter().map(|k| k / SEC).collect::>(), - self.counts, + self.cnts, self.mins, self.maxs, self.avgs, @@ -100,8 +101,8 @@ where self.ts1s.len(), self.ts1s.front().map(|k| k / SEC), self.ts2s.back().map(|k| k / SEC), - self.counts.front(), - self.counts.back(), + self.cnts.front(), + self.cnts.back(), self.avgs.front(), self.avgs.back(), ) @@ -109,48 +110,80 @@ where } } +trait HasFrontBack { + fn len(&self) -> usize; + fn front(&self) -> Option<&T>; + fn back(&self) -> Option<&T>; +} + +impl HasFrontBack for VecDeque { + fn len(&self) -> usize { + self.len() + } + + fn front(&self) -> Option<&T> { + self.front() + } + + fn back(&self) -> Option<&T> { + self.back() + } +} + +struct VecPreview<'a, T> { + c: &'a dyn HasFrontBack, +} + +impl<'a, T> VecPreview<'a, T> { + fn new(c: &'a dyn HasFrontBack) -> Self { + Self { c } + } +} + +impl<'a, T> fmt::Display for VecPreview<'a, T> +where + T: fmt::Display, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + if self.c.len() == 0 { + write!(fmt, "()") + } else if self.c.len() == 1 { + write!(fmt, "{}", self.c.front().unwrap()) + } else { + write!(fmt, "{}", self.c.front().unwrap()) + } + } +} + impl fmt::Display for BinsDim0 where - NTY: fmt::Debug, + NTY: fmt::Display, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { let self_name = any::type_name::(); - if true { - write!( - fmt, - "{self_name} count {:?} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}", - self.ts1s.len(), - self.ts1s.iter().map(|&k| TsNano::from_ns(k)).collect::>(), - self.ts2s.iter().map(|&k| TsNano::from_ns(k)).collect::>(), - self.counts, - self.mins, - self.maxs, - self.avgs, - ) - } else { - write!( - fmt, - "{self_name} count {:?} edges {:?} .. {:?} counts {:?} .. {:?} avgs {:?} .. {:?}", - self.ts1s.len(), - self.ts1s.front().map(|&k| TsNano::from_ns(k)), - self.ts2s.back().map(|&k| TsNano::from_ns(k)), - self.counts.front(), - self.counts.back(), - self.avgs.front(), - self.avgs.back(), - ) - } + write!( + fmt, + "{self_name} {{ len: {:?}, ts1s: {}, ts2s {}, counts {}, mins {}, maxs {}, avgs {} }}", + self.len(), + VecPreview::new(&self.ts1s), + VecPreview::new(&self.ts2s), + VecPreview::new(&self.cnts), + VecPreview::new(&self.mins), + VecPreview::new(&self.maxs), + VecPreview::new(&self.avgs), + ) } } impl BinsDim0 { - pub fn push(&mut self, ts1: u64, ts2: u64, count: u64, min: NTY, max: NTY, avg: f32) { + pub fn push(&mut self, ts1: u64, ts2: u64, count: u64, min: NTY, max: NTY, avg: f32, lst: NTY) { self.ts1s.push_back(ts1); self.ts2s.push_back(ts2); - self.counts.push_back(count); + self.cnts.push_back(count); self.mins.push_back(min); self.maxs.push_back(max); self.avgs.push_back(avg); + self.lsts.push_back(lst); } pub fn equal_slack(&self, other: &Self) -> bool { @@ -190,10 +223,11 @@ impl BinsDim0 { pub fn drain_into(&mut self, dst: &mut Self, range: Range) -> () { dst.ts1s.extend(self.ts1s.drain(range.clone())); dst.ts2s.extend(self.ts2s.drain(range.clone())); - dst.counts.extend(self.counts.drain(range.clone())); + dst.cnts.extend(self.cnts.drain(range.clone())); dst.mins.extend(self.mins.drain(range.clone())); dst.maxs.extend(self.maxs.drain(range.clone())); dst.avgs.extend(self.avgs.drain(range.clone())); + dst.lsts.extend(self.lsts.drain(range.clone())); } } @@ -220,10 +254,11 @@ impl Empty for BinsDim0 { Self { ts1s: VecDeque::new(), ts2s: VecDeque::new(), - counts: VecDeque::new(), + cnts: VecDeque::new(), mins: VecDeque::new(), maxs: VecDeque::new(), avgs: VecDeque::new(), + lsts: VecDeque::new(), dim0kind: None, } } @@ -257,7 +292,7 @@ impl Resettable for BinsDim0 { fn reset(&mut self) { self.ts1s.clear(); self.ts2s.clear(); - self.counts.clear(); + self.cnts.clear(); self.mins.clear(); self.maxs.clear(); self.avgs.clear(); @@ -266,7 +301,7 @@ impl Resettable for BinsDim0 { impl HasNonemptyFirstBin for BinsDim0 { fn has_nonempty_first_bin(&self) -> bool { - self.counts.front().map_or(false, |x| *x > 0) + self.cnts.front().map_or(false, |x| *x > 0) } } @@ -294,7 +329,7 @@ impl AppendEmptyBin for BinsDim0 { fn append_empty_bin(&mut self, ts1: u64, ts2: u64) { self.ts1s.push_back(ts1); self.ts2s.push_back(ts2); - self.counts.push_back(0); + self.cnts.push_back(0); self.mins.push_back(NTY::zero_b()); self.maxs.push_back(NTY::zero_b()); self.avgs.push_back(0.); @@ -305,7 +340,7 @@ impl AppendAllFrom for BinsDim0 { fn append_all_from(&mut self, src: &mut Self) { self.ts1s.extend(src.ts1s.drain(..)); self.ts2s.extend(src.ts2s.drain(..)); - self.counts.extend(src.counts.drain(..)); + self.cnts.extend(src.cnts.drain(..)); self.mins.extend(src.mins.drain(..)); self.maxs.extend(src.maxs.drain(..)); self.avgs.extend(src.avgs.drain(..)); @@ -414,7 +449,7 @@ where .ts1s .iter() .zip(&item.ts2s) - .zip(&item.counts) + .zip(&item.cnts) .zip(&item.mins) .zip(&item.maxs) .zip(&item.avgs) @@ -514,7 +549,9 @@ where panic!("TODO non-time-weighted binning to be impl"); } } else { - error!("partially filled bin with cnt 0"); + if self.filled_up_to != self.ts1now { + error!("partially filled bin with cnt 0"); + } } } if self.cnt == 0 && !push_empty { @@ -522,7 +559,7 @@ where } else { self.out.ts1s.push_back(self.ts1now.ns()); self.out.ts2s.push_back(self.ts2now.ns()); - self.out.counts.push_back(self.cnt); + self.out.cnts.push_back(self.cnt); self.out.mins.push_back(self.min.clone()); self.out.maxs.push_back(self.max.clone()); self.out.avgs.push_back(self.avg as f32); @@ -611,8 +648,9 @@ where .as_any_ref() .downcast_ref::>() { - let mins = self.mins.iter().map(|x| 0).collect(); - let maxs = self.mins.iter().map(|x| 0).collect(); + debug!("boxed_collected_with_enum_fix"); + let mins = self.mins.iter().map(|x| 6).collect(); + let maxs = self.mins.iter().map(|x| 7).collect(); let bins = BinsDim0CollectedResult:: { ts_anchor_sec: self.ts_anchor_sec.clone(), ts1_off_ms: self.ts1_off_ms.clone(), @@ -781,7 +819,7 @@ impl CollectorType for BinsDim0Collector { let vals = self.vals.as_mut().unwrap(); vals.ts1s.append(&mut src.ts1s); vals.ts2s.append(&mut src.ts2s); - vals.counts.append(&mut src.counts); + vals.cnts.append(&mut src.cnts); vals.mins.append(&mut src.mins); vals.maxs.append(&mut src.maxs); vals.avgs.append(&mut src.avgs); @@ -809,7 +847,7 @@ impl CollectorType for BinsDim0Collector { let bin_count_exp = if let Some(r) = &binrange { r.bin_count() as u32 } else { - warn!("no binrange given"); + debug!("no binrange given"); 0 }; let mut vals = if let Some(x) = self.vals.take() { @@ -818,8 +856,8 @@ impl CollectorType for BinsDim0Collector { return Err(Error::with_msg_no_trace("BinsDim0Collector without vals")); }; let bin_count = vals.ts1s.len() as u32; - eprintln!( - "-------------- MAKE MISSING BINS bin_count_exp {} bin_count {}", + debug!( + "result make missing bins bin_count_exp {} bin_count {}", bin_count_exp, bin_count ); let (missing_bins, continue_at, finished_at) = if bin_count < bin_count_exp { @@ -849,7 +887,7 @@ impl CollectorType for BinsDim0Collector { let ts2s = vals.ts2s.make_contiguous(); let (ts_anch, ts1ms, ts1ns) = ts_offs_from_abs(ts1s); let (ts2ms, ts2ns) = ts_offs_from_abs_with_anchor(ts_anch, ts2s); - let counts = vals.counts; + let counts = vals.cnts; let mins = vals.mins; let maxs = vals.maxs; let avgs = vals.avgs; @@ -885,8 +923,8 @@ impl CollectableType for BinsDim0 { #[derive(Debug)] pub struct BinsDim0Aggregator { range: SeriesRange, - count: u64, - minmax: Option<(NTY, NTY)>, + cnt: u64, + minmaxlst: Option<(NTY, NTY, NTY)>, sumc: u64, sum: f32, } @@ -895,8 +933,8 @@ impl BinsDim0Aggregator { pub fn new(range: SeriesRange, _do_time_weight: bool) -> Self { Self { range, - count: 0, - minmax: None, + cnt: 0, + minmaxlst: None, sumc: 0, sum: 0f32, } @@ -914,30 +952,32 @@ impl TimeBinnableTypeAggregator for BinsDim0Aggregator { fn ingest(&mut self, item: &Self::Input) { let beg = self.range.beg_u64(); let end = self.range.end_u64(); - for (((((&ts1, &ts2), &count), min), max), &avg) in item + for ((((((&ts1, &ts2), &count), min), max), &avg), lst) in item .ts1s .iter() .zip(item.ts2s.iter()) - .zip(item.counts.iter()) + .zip(item.cnts.iter()) .zip(item.mins.iter()) .zip(item.maxs.iter()) .zip(item.avgs.iter()) + .zip(item.lsts.iter()) { - if count == 0 { - } else if ts2 <= beg { + if ts2 <= beg { } else if ts1 >= end { } else { - if let Some((cmin, cmax)) = self.minmax.as_mut() { + if let Some((cmin, cmax, clst)) = self.minmaxlst.as_mut() { if min < cmin { *cmin = min.clone(); } if max > cmax { *cmax = max.clone(); } + *clst = lst.clone(); } else { - self.minmax = Some((min.clone(), max.clone())); + self.minmaxlst = Some((min.clone(), max.clone(), lst.clone())); } - self.count += count; + self.cnt += count; + // TODO this works only for equidistant bins edges. self.sumc += 1; self.sum += avg; } @@ -945,29 +985,33 @@ impl TimeBinnableTypeAggregator for BinsDim0Aggregator { } fn result_reset(&mut self, range: SeriesRange) -> Self::Output { - let (min, max) = if let Some((min, max)) = self.minmax.take() { - (min, max) + let ret = if let Some((min, max, lst)) = self.minmaxlst.take() { + self.minmaxlst = Some((lst.clone(), lst.clone(), lst.clone())); + let avg = if self.sumc > 0 { + self.sum / self.sumc as f32 + } else { + NTY::zero_b().as_prim_f32_b() + }; + Self::Output { + ts1s: [self.range.beg_u64()].into(), + ts2s: [self.range.end_u64()].into(), + cnts: [self.cnt].into(), + mins: [min].into(), + maxs: [max].into(), + avgs: [avg].into(), + lsts: [lst].into(), + // TODO + dim0kind: None, + } } else { - (NTY::zero_b(), NTY::zero_b()) - }; - let avg = if self.sumc > 0 { - self.sum / self.sumc as f32 - } else { - NTY::zero_b().as_prim_f32_b() - }; - let ret = Self::Output { - ts1s: [self.range.beg_u64()].into(), - ts2s: [self.range.end_u64()].into(), - counts: [self.count].into(), - mins: [min].into(), - maxs: [max].into(), - avgs: [avg].into(), - // TODO - dim0kind: None, + if self.cnt != 0 { + error!("result_reset non-zero cnt but no minmaxlst"); + } + warn!("result_reset missing minmaxlst"); + Self::Output::empty() }; self.range = range; - self.count = 0; - self.minmax = None; + self.cnt = 0; self.sumc = 0; self.sum = 0.; ret @@ -1264,7 +1308,7 @@ impl TimeBinned for BinsDim0 { fn counts(&self) -> &[u64] { // TODO check for contiguous - self.counts.as_slices().0 + self.cnts.as_slices().0 } // TODO is Vec needed? @@ -1288,7 +1332,7 @@ impl TimeBinned for BinsDim0 { if self.ts1s.len() != self.ts2s.len() { write!(&mut msg, "ts1s ≠ ts2s\n").unwrap(); } - for (i, ((count, min), max)) in self.counts.iter().zip(&self.mins).zip(&self.maxs).enumerate() { + for (i, ((count, min), max)) in self.cnts.iter().zip(&self.mins).zip(&self.maxs).enumerate() { if min.as_prim_f32_b() < 1. && *count != 0 { write!(&mut msg, "i {} count {} min {:?} max {:?}\n", i, count, min, max).unwrap(); } @@ -1313,10 +1357,11 @@ impl TimeBinned for BinsDim0 { let ret = BinsDim0:: { ts1s: replace(&mut self.ts1s, VecDeque::new()), ts2s: replace(&mut self.ts2s, VecDeque::new()), - counts: replace(&mut self.counts, VecDeque::new()), + cnts: replace(&mut self.cnts, VecDeque::new()), mins: self.mins.iter().map(AsPrimF32::as_prim_f32_b).collect(), maxs: self.maxs.iter().map(AsPrimF32::as_prim_f32_b).collect(), avgs: replace(&mut self.avgs, VecDeque::new()), + lsts: self.lsts.iter().map(AsPrimF32::as_prim_f32_b).collect(), dim0kind: None, }; Box::new(ret) @@ -1328,7 +1373,7 @@ impl TimeBinned for BinsDim0 { // TODO make it harder to forget new members when the struct may get modified in the future dst.ts1s.extend(self.ts1s.drain(range.clone())); dst.ts2s.extend(self.ts2s.drain(range.clone())); - dst.counts.extend(self.counts.drain(range.clone())); + dst.cnts.extend(self.cnts.drain(range.clone())); dst.mins.extend(self.mins.drain(range.clone())); dst.maxs.extend(self.maxs.drain(range.clone())); dst.avgs.extend(self.avgs.drain(range.clone())); @@ -1359,9 +1404,10 @@ fn bins_timebin_fill_empty_00() { let got = ready.unwrap(); let got: &BinsDim0 = got.as_any_ref().downcast_ref().unwrap(); let mut exp = BinsDim0::empty(); - for i in 0..5 { - exp.push(SEC * 2 * (9 + i), SEC * 2 * (10 + i), 0, 0, 0, 0.); - } + // Currently bins without lst can not exist. + // for i in 0..5 { + // exp.push(SEC * 2 * (9 + i), SEC * 2 * (10 + i), 0, 0, 0, 0., None); + // } assert_eq!(got, &exp); } @@ -1384,9 +1430,10 @@ fn bins_timebin_fill_empty_01() { let got = ready.unwrap(); let got: &BinsDim0 = got.as_any_ref().downcast_ref().unwrap(); let mut exp = BinsDim0::empty(); - for i in 0..5 { - exp.push(SEC * 2 * (9 + i), SEC * 2 * (10 + i), 0, 0, 0, 0.); - } + // Currently bins without lst can not exist. + // for i in 0..5 { + // exp.push(SEC * 2 * (9 + i), SEC * 2 * (10 + i), 0, 0, 0, 0., None); + // } assert_eq!(got, &exp); } @@ -1408,9 +1455,10 @@ fn bins_timebin_push_empty_00() { let got = ready.unwrap(); let got: &BinsDim0 = got.as_any_ref().downcast_ref().unwrap(); let mut exp = BinsDim0::empty(); - for i in 0..1 { - exp.push(SEC * 2 * (9 + i), SEC * 2 * (10 + i), 0, 0, 0, 0.); - } + // Currently bins without lst can not exist. + // for i in 0..1 { + // exp.push(SEC * 2 * (9 + i), SEC * 2 * (10 + i), 0, 0, 0, 0., None); + // } assert_eq!(got, &exp); } @@ -1434,17 +1482,18 @@ fn bins_timebin_push_empty_01() { let got = ready.unwrap(); let got: &BinsDim0 = got.as_any_ref().downcast_ref().unwrap(); let mut exp = BinsDim0::empty(); - for i in 0..3 { - exp.push(SEC * 2 * (9 + i), SEC * 2 * (10 + i), 0, 0, 0, 0.); - } + // Currently bins without lst can not exist. + // for i in 0..3 { + // exp.push(SEC * 2 * (9 + i), SEC * 2 * (10 + i), 0, 0, 0, 0., None); + // } assert_eq!(got, &exp); } #[test] fn bins_timebin_ingest_only_before() { let mut bins = BinsDim0::::empty(); - bins.push(SEC * 2, SEC * 4, 3, 7, 9, 8.1); - bins.push(SEC * 4, SEC * 6, 3, 6, 9, 8.2); + bins.push(SEC * 2, SEC * 4, 3, 7, 9, 8.1, 8); + bins.push(SEC * 4, SEC * 6, 3, 6, 9, 8.2, 8); let binrange = BinnedRangeEnum::Time(BinnedRange { bin_len: TsNano::from_ns(SEC * 2), bin_off: 9, @@ -1460,16 +1509,17 @@ fn bins_timebin_ingest_only_before() { let got = ready.unwrap(); let got: &BinsDim0 = got.as_any_ref().downcast_ref().unwrap(); let mut exp = BinsDim0::empty(); - exp.push(SEC * 18, SEC * 20, 0, 0, 0, 0.); + // Currently bins without lst can not exist. + // exp.push(SEC * 18, SEC * 20, 0, 0, 0, 0., None); assert_eq!(got, &exp); } #[test] fn bins_timebin_ingest_00() { let mut bins = BinsDim0::::empty(); - bins.push(SEC * 20, SEC * 21, 3, 70, 94, 82.); - bins.push(SEC * 21, SEC * 22, 5, 71, 93, 86.); - bins.push(SEC * 23, SEC * 24, 6, 72, 92, 81.); + bins.push(SEC * 20, SEC * 21, 3, 70, 94, 82., 80); + bins.push(SEC * 21, SEC * 22, 5, 71, 93, 86., 81); + bins.push(SEC * 23, SEC * 24, 6, 72, 92, 81., 82); let binrange = BinnedRangeEnum::Time(BinnedRange { bin_len: TsNano::from_ns(SEC * 2), bin_off: 9, @@ -1485,9 +1535,10 @@ fn bins_timebin_ingest_00() { let got = ready.unwrap(); let got: &BinsDim0 = got.as_any_ref().downcast_ref().unwrap(); let mut exp = BinsDim0::empty(); - exp.push(SEC * 18, SEC * 20, 0, 0, 0, 0.); - exp.push(SEC * 20, SEC * 22, 8, 70, 94, 84.); - exp.push(SEC * 22, SEC * 24, 6, 72, 92, 81.); + // Currently bins without lst can not exist. + // exp.push(SEC * 18, SEC * 20, 0, 0, 0, 0., None); + exp.push(SEC * 20, SEC * 22, 8, 70, 94, 84., 82); + exp.push(SEC * 22, SEC * 24, 6, 72, 92, 81., 91); assert_eq!(got, &exp); } @@ -1500,7 +1551,7 @@ fn bins_timebin_ingest_continuous_00() { }); let do_time_weight = true; let mut bins = BinsDim0::::empty(); - bins.push(SEC * 20, SEC * 21, 3, 70, 94, 82.); + bins.push(SEC * 20, SEC * 21, 3, 70, 94, 82., 80); //bins.push(SEC * 21, SEC * 22, 5, 71, 93, 86.); //bins.push(SEC * 23, SEC * 24, 6, 72, 92, 81.); let mut binner = bins @@ -1512,8 +1563,9 @@ fn bins_timebin_ingest_continuous_00() { let got = ready.unwrap(); let got: &BinsDim0 = got.as_any_ref().downcast_ref().unwrap(); let mut exp = BinsDim0::empty(); - exp.push(SEC * 18, SEC * 20, 0, 0, 0, 0.); - exp.push(SEC * 20, SEC * 22, 8, 70, 94, 84.); - exp.push(SEC * 22, SEC * 24, 6, 72, 92, 81.); + // Currently bins without lst can not exist. + // exp.push(SEC * 18, SEC * 20, 0, 0, 0, 0., None); + exp.push(SEC * 20, SEC * 22, 8, 70, 94, 84., 82); + exp.push(SEC * 22, SEC * 24, 6, 72, 92, 81., 91); assert_eq!(got, &exp); } diff --git a/crates/items_2/src/binsxbindim0.rs b/crates/items_2/src/binsxbindim0.rs index b36e28b..a58543b 100644 --- a/crates/items_2/src/binsxbindim0.rs +++ b/crates/items_2/src/binsxbindim0.rs @@ -886,10 +886,11 @@ impl TimeBinned for BinsXbinDim0 { let ret = super::binsdim0::BinsDim0:: { ts1s: replace(&mut self.ts1s, VecDeque::new()), ts2s: replace(&mut self.ts2s, VecDeque::new()), - counts: replace(&mut self.counts, VecDeque::new()), + cnts: replace(&mut self.counts, VecDeque::new()), mins: self.mins.iter().map(AsPrimF32::as_prim_f32_b).collect(), maxs: self.maxs.iter().map(AsPrimF32::as_prim_f32_b).collect(), avgs: replace(&mut self.avgs, VecDeque::new()), + lsts: err::todoval(), dim0kind: None, }; Box::new(ret) @@ -905,6 +906,7 @@ impl TimeBinned for BinsXbinDim0 { dst.mins.extend(self.mins.drain(range.clone())); dst.maxs.extend(self.maxs.drain(range.clone())); dst.avgs.extend(self.avgs.drain(range.clone())); + todo!("handle last_seen"); Ok(()) } else { let type_name = any::type_name::(); diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index 8d333c9..8c1d58d 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -57,31 +57,22 @@ use std::fmt; use std::mem; #[allow(unused)] -macro_rules! trace_ingest { - ($($arg:tt)*) => {}; - ($($arg:tt)*) => { trace!($($arg)*); }; -} +macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } #[allow(unused)] -macro_rules! trace_ingest_item { - ($($arg:tt)*) => {}; - ($($arg:tt)*) => { trace!($($arg)*); }; -} +macro_rules! trace_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } #[allow(unused)] -macro_rules! trace2 { - ($($arg:tt)*) => {}; - ($($arg:tt)*) => { trace!($($arg)*); }; -} +macro_rules! trace_ingest_item { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } #[allow(unused)] -macro_rules! trace_binning { - ($($arg:tt)*) => { - if false { - trace!($($arg)*); - } - }; -} +macro_rules! trace2 { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } + +#[allow(unused)] +macro_rules! trace_binning { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } + +#[allow(unused)] +macro_rules! debug_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } #[derive(Clone, PartialEq, Serialize, Deserialize)] pub struct EventsDim0NoPulse { @@ -545,12 +536,11 @@ impl items_0::collect_s::CollectableType for EventsDim0 { pub struct EventsDim0Aggregator { range: SeriesRange, count: u64, - minmax: Option<(STY, STY)>, + minmaxlst: Option<(STY, STY, STY)>, sumc: u64, sum: f32, int_ts: u64, last_ts: u64, - last_val: Option, do_time_weight: bool, events_ignored_count: u64, items_seen: usize, @@ -580,14 +570,13 @@ impl TimeAggregatorCommonV0Trait for EventsDim0Aggregator { self.apply_event_unweight(val.clone()); self.count += 1; self.last_ts = ts; - self.last_val = Some(val.clone()); } } fn common_ingest_one_before(&mut self, item: &Self::Input, j: usize) { //trace_ingest!("{self_name} ingest {:6} {:20} {:10?} BEFORE", i1, ts, val); + self.apply_min_max_lst(item.values[j].clone()); self.last_ts = item.tss[j]; - self.last_val = Some(item.values[j].clone()); } fn common_ingest_range(&mut self, item: &Self::Input, r: core::ops::Range) { @@ -598,7 +587,6 @@ impl TimeAggregatorCommonV0Trait for EventsDim0Aggregator { } self.count += 1; self.last_ts = ts; - self.last_val = Some(val.clone()); } } } @@ -609,16 +597,16 @@ impl EventsDim0Aggregator { } pub fn new(range: SeriesRange, do_time_weight: bool) -> Self { + trace_init!("{}::new", Self::type_name()); let int_ts = range.beg_u64(); Self { range, count: 0, - minmax: None, + minmaxlst: None, sumc: 0, sum: 0., int_ts, last_ts: 0, - last_val: None, do_time_weight, events_ignored_count: 0, items_seen: 0, @@ -626,24 +614,24 @@ impl EventsDim0Aggregator { } // TODO reduce clone.. optimize via more traits to factor the trade-offs? - fn apply_min_max(&mut self, val: STY) { + fn apply_min_max_lst(&mut self, val: STY) { trace_ingest!( - "apply_min_max val {:?} last_val {:?} count {} sumc {:?} minmax {:?}", + "apply_min_max_lst val {:?} count {} sumc {:?} minmaxlst {:?}", val, - self.last_val, self.count, self.sumc, - self.minmax, + self.minmaxlst, ); - if let Some((min, max)) = self.minmax.as_mut() { + if let Some((min, max, lst)) = self.minmaxlst.as_mut() { if *min > val { *min = val.clone(); } if *max < val { *max = val.clone(); } + *lst = val.clone(); } else { - self.minmax = Some((val.clone(), val.clone())); + self.minmaxlst = Some((val.clone(), val.clone(), val.clone())); } } @@ -651,7 +639,7 @@ impl EventsDim0Aggregator { error!("TODO check again result_reset_unweight"); err::todo(); let vf = val.as_prim_f32_b(); - self.apply_min_max(val); + self.apply_min_max_lst(val); if vf.is_nan() { } else { self.sum += vf; @@ -660,11 +648,11 @@ impl EventsDim0Aggregator { } fn apply_event_time_weight(&mut self, px: u64) { - if let Some(v) = &self.last_val { + if let Some((_, _, v)) = self.minmaxlst.as_ref() { trace_ingest!("apply_event_time_weight with v {v:?}"); let vf = v.as_prim_f32_b(); let v2 = v.clone(); - self.apply_min_max(v2); + self.apply_min_max_lst(v2); self.sumc += 1; let w = (px - self.int_ts) as f32 * 1e-9; if false { @@ -683,7 +671,7 @@ impl EventsDim0Aggregator { } self.int_ts = px; } else { - debug!("apply_event_time_weight NO VALUE"); + debug_ingest!("apply_event_time_weight NO VALUE"); } } @@ -695,44 +683,47 @@ impl EventsDim0Aggregator { TimeAggregatorCommonV0Func::ingest_time_weight(self, item) } - fn reset_values(&mut self, range: SeriesRange) { + fn reset_values(&mut self, lst: STY, range: SeriesRange) { self.int_ts = range.beg_u64(); - trace_binning!("ON RESET SET int_ts {:10}", self.int_ts); + trace_init!("ON RESET SET int_ts {:10}", self.int_ts); self.range = range; self.count = 0; self.sum = 0.; self.sumc = 0; - self.minmax = None; + self.minmaxlst = Some((lst.clone(), lst.clone(), lst)); self.items_seen = 0; } fn result_reset_unweight(&mut self, range: SeriesRange) -> BinsDim0 { - let (min, max) = if let Some((min, max)) = self.minmax.take() { - (min, max) + error!("TODO result_reset_unweight"); + panic!("TODO result_reset_unweight"); + if let Some((min, max, lst)) = self.minmaxlst.take() { + let avg = if self.sumc > 0 { + self.sum / self.sumc as f32 + } else { + STY::zero_b().as_prim_f32_b() + }; + let ret = if self.range.is_time() { + BinsDim0 { + ts1s: [self.range.beg_u64()].into(), + ts2s: [self.range.end_u64()].into(), + cnts: [self.count].into(), + mins: [min].into(), + maxs: [max].into(), + avgs: [avg].into(), + lsts: [lst.clone()].into(), + dim0kind: Some(self.range.dim0kind()), + } + } else { + error!("TODO result_reset_unweight"); + err::todoval() + }; + self.reset_values(lst, range); + ret } else { - (STY::zero_b(), STY::zero_b()) - }; - let avg = if self.sumc > 0 { - self.sum / self.sumc as f32 - } else { - STY::zero_b().as_prim_f32_b() - }; - let ret = if self.range.is_time() { - BinsDim0 { - ts1s: [self.range.beg_u64()].into(), - ts2s: [self.range.end_u64()].into(), - counts: [self.count].into(), - mins: [min].into(), - maxs: [max].into(), - avgs: [avg].into(), - dim0kind: Some(self.range.dim0kind()), - } - } else { - error!("TODO result_reset_unweight"); - err::todoval() - }; - self.reset_values(range); - ret + // TODO add check that nothing is different from initial values, or reset without lst. + BinsDim0::empty() + } } fn result_reset_time_weight(&mut self, range: SeriesRange) -> BinsDim0 { @@ -751,36 +742,33 @@ impl EventsDim0Aggregator { error!("TODO result_reset_time_weight"); err::todoval() } - let (min, max) = if let Some((min, max)) = self.minmax.take() { - (min, max) - } else { - (STY::zero_b(), STY::zero_b()) - }; - let avg = if self.sumc > 0 { - self.sum / (self.range.delta_u64() as f32 * 1e-9) - } else { - if let Some(v) = self.last_val.as_ref() { - v.as_prim_f32_b() + if let Some((min, max, lst)) = self.minmaxlst.take() { + let avg = if self.sumc > 0 { + self.sum / (self.range.delta_u64() as f32 * 1e-9) } else { - STY::zero_b().as_prim_f32_b() - } - }; - let ret = if self.range.is_time() { - BinsDim0 { - ts1s: [range_beg].into(), - ts2s: [range_end].into(), - counts: [self.count].into(), - mins: [min].into(), - maxs: [max].into(), - avgs: [avg].into(), - dim0kind: Some(self.range.dim0kind()), - } + lst.as_prim_f32_b() + }; + let ret = if self.range.is_time() { + BinsDim0 { + ts1s: [range_beg].into(), + ts2s: [range_end].into(), + cnts: [self.count].into(), + mins: [min].into(), + maxs: [max].into(), + avgs: [avg].into(), + lsts: [lst.clone()].into(), + dim0kind: Some(self.range.dim0kind()), + } + } else { + error!("TODO result_reset_time_weight"); + err::todoval() + }; + self.reset_values(lst, range); + ret } else { - error!("TODO result_reset_time_weight"); - err::todoval() - }; - self.reset_values(range); - ret + // TODO add check that nothing is different from initial values, or reset without lst. + BinsDim0::empty() + } } } @@ -793,9 +781,7 @@ impl TimeBinnableTypeAggregator for EventsDim0Aggregator { } fn ingest(&mut self, item: &Self::Input) { - if true { - trace_ingest!("{} ingest {} events", Self::type_name(), item.len()); - } + trace_ingest!("{} ingest {} events", Self::type_name(), item.len()); if false { for (i, &ts) in item.tss.iter().enumerate() { trace_ingest!("{} ingest {:6} {:20}", Self::type_name(), i, ts); @@ -825,6 +811,10 @@ impl TimeBinnable for EventsDim0 { do_time_weight: bool, emit_empty_bins: bool, ) -> Box { + trace_init!( + "<{} as items_0::timebin::TimeBinnable>::time_binner_new", + self.type_name() + ); // TODO get rid of unwrap let ret = EventsDim0TimeBinner::::new(binrange, do_time_weight, emit_empty_bins).unwrap(); Box::new(ret) @@ -1095,11 +1085,11 @@ impl EventsDim0TimeBinner { } pub fn new(binrange: BinnedRangeEnum, do_time_weight: bool, emit_empty_bins: bool) -> Result { - trace!("{}::new binrange {:?}", Self::type_name(), binrange); + trace_init!("{}::new binrange {:?}", Self::type_name(), binrange); let rng = binrange .range_at(0) .ok_or_else(|| Error::with_msg_no_trace("empty binrange"))?; - trace!("{}::new rng {:?}", Self::type_name(), rng); + trace_init!("{}::new rng {:?}", Self::type_name(), rng); let agg = EventsDim0Aggregator::new(rng, do_time_weight); let ret = Self { binrange, @@ -1480,7 +1470,7 @@ fn events_timebin_ingest_continuous_00() { let got = ready.unwrap(); let got: &BinsDim0 = got.as_any_ref().downcast_ref().unwrap(); let mut exp = BinsDim0::empty(); - exp.push(SEC * 18, SEC * 20, 0, 0, 0, 0.); - exp.push(SEC * 20, SEC * 22, 1, 20, 20, 20.); + // exp.push(SEC * 18, SEC * 20, 0, 0, 0, 0., None); + exp.push(SEC * 20, SEC * 22, 1, 20, 20, 20., 20); assert!(f32_iter_cmp_near(got.avgs.clone(), exp.avgs.clone(), 0.0001, 0.0001)); } diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 8fc76d3..c117c69 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -702,6 +702,12 @@ impl Default for EnumVariant { } } +impl fmt::Display for EnumVariant { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}({})", self.ix, self.name) + } +} + impl AppendToUrl for ScalarType { fn append_to_url(&self, url: &mut Url) { let mut g = url.query_pairs_mut(); @@ -2141,6 +2147,16 @@ const TIME_BIN_THRESHOLDS: [u64; 26] = [ DAY * 64, ]; +const TIME_BIN_LEN_CACHE_OPTS: [DtMs; 2] = [ + // + DtMs(1000 * 10), + DtMs(1000 * 60 * 60), +]; + +pub fn time_bin_len_cache_opts() -> &'static [DtMs] { + &TIME_BIN_LEN_CACHE_OPTS +} + const PULSE_BIN_THRESHOLDS: [u64; 25] = [ 10, 20, 40, 80, 100, 200, 400, 800, 1000, 2000, 4000, 8000, 10000, 20000, 40000, 80000, 100000, 200000, 400000, 800000, 1000000, 2000000, 4000000, 8000000, 10000000, @@ -2485,7 +2501,7 @@ where }*/ let beg = self.bin_len.times(self.bin_off).as_u64(); let end = self.bin_len.times(self.bin_off + self.bin_cnt).as_u64(); - warn!("TODO make generic for pulse"); + debug!("TODO make generic for pulse"); NanoRange { beg, end } } @@ -3715,6 +3731,13 @@ impl ChannelTypeConfigGen { ChannelTypeConfigGen::SfDatabuffer(x) => x.shape(), } } + + pub fn series(&self) -> Option { + match self { + ChannelTypeConfigGen::Scylla(ch_conf) => Some(ch_conf.series()), + ChannelTypeConfigGen::SfDatabuffer(sf_ch_fetch_info) => None, + } + } } impl From for ChannelTypeConfigGen { diff --git a/crates/netpod/src/query.rs b/crates/netpod/src/query.rs index 5f9913e..fdab566 100644 --- a/crates/netpod/src/query.rs +++ b/crates/netpod/src/query.rs @@ -30,6 +30,7 @@ pub enum CacheUsage { Use, Ignore, Recreate, + V0NoCache, } impl CacheUsage { @@ -38,6 +39,7 @@ impl CacheUsage { CacheUsage::Use => "use", CacheUsage::Ignore => "ignore", CacheUsage::Recreate => "recreate", + CacheUsage::V0NoCache => "v0nocache", } .into() } @@ -53,6 +55,8 @@ impl CacheUsage { Ok(Some(CacheUsage::Ignore)) } else if k == "recreate" { Ok(Some(CacheUsage::Recreate)) + } else if k == "v0nocache" { + Ok(Some(CacheUsage::V0NoCache)) } else { Err(Error::with_msg(format!("unexpected cacheUsage {:?}", k)))? } @@ -67,6 +71,8 @@ impl CacheUsage { CacheUsage::Recreate } else if s == "use" { CacheUsage::Use + } else if s == "v0nocache" { + CacheUsage::V0NoCache } else { return Err(Error::with_msg(format!("can not interpret cache usage string: {}", s))); }; @@ -78,6 +84,7 @@ impl CacheUsage { CacheUsage::Use => true, CacheUsage::Ignore => false, CacheUsage::Recreate => true, + CacheUsage::V0NoCache => false, } } @@ -86,6 +93,7 @@ impl CacheUsage { CacheUsage::Use => true, CacheUsage::Ignore => false, CacheUsage::Recreate => false, + CacheUsage::V0NoCache => false, } } } diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index a3d1fd2..e9ddd8d 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -163,13 +163,13 @@ impl Stream for ScyllaEventsReadStream { } else if let Some(fut) = self.stream.as_mut() { match fut.poll_next_unpin(cx) { Ready(Some(x)) => { - let x = try_map_sitemty_data!(x, |x| match x { - ChannelEvents::Events(x) => { - let x = x.to_dim0_f32_for_binning(); - Ok(ChannelEvents::Events(x)) - } - ChannelEvents::Status(x) => Ok(ChannelEvents::Status(x)), - }); + // let x = try_map_sitemty_data!(x, |x| match x { + // ChannelEvents::Events(x) => { + // let x = x.to_dim0_f32_for_binning(); + // Ok(ChannelEvents::Events(x)) + // } + // ChannelEvents::Status(x) => Ok(ChannelEvents::Status(x)), + // }); Ready(Some(x)) } Ready(None) => Ready(None), diff --git a/crates/query/src/api4/binned.rs b/crates/query/src/api4/binned.rs index a637852..f50c1c6 100644 --- a/crates/query/src/api4/binned.rs +++ b/crates/query/src/api4/binned.rs @@ -134,8 +134,8 @@ impl BinnedQuery { &self.transform } - pub fn cache_usage(&self) -> CacheUsage { - self.cache_usage.as_ref().map_or(CacheUsage::Ignore, |x| x.clone()) + pub fn cache_usage(&self) -> Option { + self.cache_usage.clone() } pub fn disk_stats_every(&self) -> ByteSize { diff --git a/crates/scyllaconn/src/bincache.rs b/crates/scyllaconn/src/bincache.rs index 38bbbd0..78ede96 100644 --- a/crates/scyllaconn/src/bincache.rs +++ b/crates/scyllaconn/src/bincache.rs @@ -245,14 +245,15 @@ pub async fn worker_write( scy: &ScySession, ) -> Result<(), streams::timebin::cached::reader::Error> { let mut msp_last = u64::MAX; - for (((((&ts1, &ts2), &cnt), &min), &max), &avg) in bins + for ((((((&ts1, &ts2), &cnt), &min), &max), &avg), &lst) in bins .ts1s .iter() .zip(bins.ts2s.iter()) - .zip(bins.counts.iter()) + .zip(bins.cnts.iter()) .zip(bins.mins.iter()) .zip(bins.maxs.iter()) .zip(bins.avgs.iter()) + .zip(bins.lsts.iter()) { let bin_len = DtMs::from_ms_u64((ts2 - ts1) / 1000000); let div = streams::timebin::cached::reader::part_len(bin_len).ns(); @@ -267,6 +268,7 @@ pub async fn worker_write( min, max, avg, + lst, ); // trace!("cache write {:?}", params); scy.execute(stmts_cache.st_write_f32(), params) @@ -296,7 +298,7 @@ pub async fn worker_read( .execute_iter(stmts_cache.st_read_f32().clone(), params) .await .map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?; - let mut it = res.into_typed::<(i32, i64, f32, f32, f32)>(); + let mut it = res.into_typed::<(i32, i64, f32, f32, f32, f32)>(); let mut bins = BinsDim0::empty(); while let Some(x) = it.next().await { let row = x.map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?; @@ -305,9 +307,10 @@ pub async fn worker_read( let min = row.2; let max = row.3; let avg = row.4; + let lst = row.5; let ts1 = bin_len.ns() * off + div * msp; let ts2 = ts1 + bin_len.ns(); - bins.push(ts1, ts2, cnt, min, max, avg); + bins.push(ts1, ts2, cnt, min, max, avg, lst); } Ok(bins) } diff --git a/crates/scyllaconn/src/conn.rs b/crates/scyllaconn/src/conn.rs index 8e1b31c..951341f 100644 --- a/crates/scyllaconn/src/conn.rs +++ b/crates/scyllaconn/src/conn.rs @@ -17,7 +17,7 @@ pub async fn create_scy_session(scyconf: &ScyllaConfig) -> Result Result { - warn!("create_connection\n\n CREATING SCYLLA CONNECTION\n\n"); + warn!("creating scylla connection"); let scy = scylla::SessionBuilder::new() .known_nodes(&scyconf.hosts) .default_execution_profile_handle( diff --git a/crates/scyllaconn/src/events2/prepare.rs b/crates/scyllaconn/src/events2/prepare.rs index 4e27d93..4de5902 100644 --- a/crates/scyllaconn/src/events2/prepare.rs +++ b/crates/scyllaconn/src/events2/prepare.rs @@ -262,8 +262,8 @@ impl StmtsCache { .prepare(format!( concat!( "insert into {}.{}binned_scalar_f32", - " (series, bin_len_ms, ts_msp, off, count, min, max, avg)", - " values (?, ?, ?, ?, ?, ?, ?, ?)" + " (series, bin_len_ms, ts_msp, off, count, min, max, avg, lst)", + " values (?, ?, ?, ?, ?, ?, ?, ?, ?)" ), ks, rt.table_prefix() @@ -272,7 +272,7 @@ impl StmtsCache { let st_read_f32 = scy .prepare(format!( concat!( - "select off, count, min, max, avg", + "select off, count, min, max, avg, lst", " from {}.{}binned_scalar_f32", " where series = ?", " and bin_len_ms = ?", diff --git a/crates/scyllaconn/src/schema.rs b/crates/scyllaconn/src/schema.rs new file mode 100644 index 0000000..feeebe8 --- /dev/null +++ b/crates/scyllaconn/src/schema.rs @@ -0,0 +1,15 @@ +use err::thiserror; +use err::ThisError; +use netpod::ttl::RetentionTime; +use netpod::ScyllaConfig; +use scylla::Session as ScySession; + +#[derive(Debug, ThisError)] +#[cstm(name = "ScyllaSchema")] +pub enum Error { + Scylla, +} + +pub async fn schema(rt: RetentionTime, scyco: &ScyllaConfig, scy: &ScySession) -> Result<(), Error> { + todo!() +} diff --git a/crates/scyllaconn/src/scyllaconn.rs b/crates/scyllaconn/src/scyllaconn.rs index a5c1763..4b1d11b 100644 --- a/crates/scyllaconn/src/scyllaconn.rs +++ b/crates/scyllaconn/src/scyllaconn.rs @@ -5,6 +5,7 @@ pub mod errconv; pub mod events; pub mod events2; pub mod range; +pub mod schema; pub mod status; pub mod worker; diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index 2534f97..84d6be0 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -33,6 +33,7 @@ pub enum Error { Toplist(#[from] crate::accounting::toplist::Error), MissingKeyspaceConfig, CacheWriteF32(#[from] streams::timebin::cached::reader::Error), + Schema(#[from] crate::schema::Error), } #[derive(Debug)] @@ -212,6 +213,9 @@ impl ScyllaWorker { .await .map_err(Error::ScyllaConnection)?; let scy = Arc::new(scy); + crate::schema::schema(RetentionTime::Short, &self.scyconf_st, &scy).await?; + crate::schema::schema(RetentionTime::Medium, &self.scyconf_mt, &scy).await?; + crate::schema::schema(RetentionTime::Long, &self.scyconf_lt, &scy).await?; let kss = [ self.scyconf_st.keyspace.as_str(), self.scyconf_mt.keyspace.as_str(), diff --git a/crates/streams/src/test/timebin.rs b/crates/streams/src/test/timebin.rs index 7ff161d..3d75b73 100644 --- a/crates/streams/src/test/timebin.rs +++ b/crates/streams/src/test/timebin.rs @@ -64,14 +64,15 @@ fn time_bin_00() -> Result<(), Error> { let bins = BinsDim0::empty(); d.push_back(bins); let mut bins = BinsDim0::empty(); - bins.push(SEC * 0, SEC * 1, 0, 0.0, 0.0, 0.0); - bins.push(SEC * 1, SEC * 2, 2, 0.0535830, 100.0589, 50.05624); - bins.push(SEC * 2, SEC * 3, 2, 200.06143, 300.07645, 250.06894); - bins.push(SEC * 3, SEC * 4, 2, 400.08554, 500.05222, 450.06888); - bins.push(SEC * 4, SEC * 5, 2, 600.0025, 700.09094, 650.04675); + // Currently can not cosntruct bins without minmaxlst + // bins.push(SEC * 0, SEC * 1, 0, 0.0, 0.0, 0.0); + bins.push(SEC * 1, SEC * 2, 2, 0.0535830, 100.0589, 50.05624, 100.0589); + bins.push(SEC * 2, SEC * 3, 2, 200.06143, 300.07645, 250.06894, 300.07645); + bins.push(SEC * 3, SEC * 4, 2, 400.08554, 500.05222, 450.06888, 500.05222); + bins.push(SEC * 4, SEC * 5, 2, 600.0025, 700.09094, 650.04675, 700.09094); d.push_back(bins); let mut bins = BinsDim0::empty(); - bins.push(SEC * 5, SEC * 6, 2, 800.0619, 900.02844, 850.04517); + bins.push(SEC * 5, SEC * 6, 2, 800.0619, 900.02844, 850.04517, 900.02844); d.push_back(bins); d }; @@ -342,6 +343,7 @@ fn timebin_multi_stage_00() -> Result<(), Error> { 20 + 2 * i as i32, 21 + 2 * i as i32, 20.5 + 2. * i as f32, + 21 + 2 * i as i32, ); } bins @@ -356,6 +358,7 @@ fn timebin_multi_stage_00() -> Result<(), Error> { 20 + 4 * i as i32, 23 + 4 * i as i32, 21.5 + 4. * i as f32, + 23 + 4 * i as i32, ); } bins diff --git a/crates/streams/src/timebin/fromevents.rs b/crates/streams/src/timebin/fromevents.rs index 71d8d2a..8a05d75 100644 --- a/crates/streams/src/timebin/fromevents.rs +++ b/crates/streams/src/timebin/fromevents.rs @@ -7,6 +7,7 @@ use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_2::binsdim0::BinsDim0; +use items_2::channelevents::ChannelEvents; use netpod::log::*; use netpod::BinnedRange; use netpod::ChConf; @@ -40,6 +41,16 @@ impl BinnedFromEvents { panic!(); } let stream = read_provider.read(evq, chconf); + let stream = stream.map(|x| { + let x = items_0::try_map_sitemty_data!(x, |x| match x { + ChannelEvents::Events(x) => { + let x = x.to_dim0_f32_for_binning(); + Ok(ChannelEvents::Events(x)) + } + ChannelEvents::Status(x) => Ok(ChannelEvents::Status(x)), + }); + x + }); let stream = Box::pin(stream); let stream = super::basic::TimeBinnedStream::new(stream, netpod::BinnedRangeEnum::Time(range), do_time_weight); let stream = stream.map(|item| match item { diff --git a/crates/streams/src/timebin/fromlayers.rs b/crates/streams/src/timebin/fromlayers.rs index 4bb7ced..81d053a 100644 --- a/crates/streams/src/timebin/fromlayers.rs +++ b/crates/streams/src/timebin/fromlayers.rs @@ -40,6 +40,8 @@ pub enum Error { GapFill(#[from] super::gapfill::Error), BinnedFromEvents(#[from] super::fromevents::Error), SfDatabufferNotSupported, + #[error("FinerGridMismatch({0}, {1})")] + FinerGridMismatch(DtMs, DtMs), } type BoxedInput = Pin>> + Send>>; @@ -75,7 +77,7 @@ impl TimeBinnedFromLayers { cache_read_provider: Arc, events_read_provider: Arc, ) -> Result { - info!( + debug!( "{}::new {:?} {:?} {:?}", Self::type_name(), series, @@ -84,7 +86,7 @@ impl TimeBinnedFromLayers { ); let bin_len = DtMs::from_ms_u64(range.bin_len.ms()); if bin_len_layers.contains(&bin_len) { - info!("{}::new bin_len in layers {:?}", Self::type_name(), range); + debug!("{}::new bin_len in layers {:?}", Self::type_name(), range); let inp = super::gapfill::GapFill::new( "FromLayers".into(), ch_conf.clone(), @@ -114,8 +116,11 @@ impl TimeBinnedFromLayers { } else { match find_next_finer_bin_len(bin_len, &bin_len_layers) { Some(finer) => { + if bin_len.ms() % finer.ms() != 0 { + return Err(Error::FinerGridMismatch(bin_len, finer)); + } let range_finer = BinnedRange::from_nano_range(range.to_nano_range(), finer); - warn!( + debug!( "{}::new next finer from bins {:?} {:?}", Self::type_name(), finer, @@ -154,7 +159,7 @@ impl TimeBinnedFromLayers { Ok(ret) } None => { - warn!("{}::new next finer from events", Self::type_name()); + debug!("{}::new next finer from events", Self::type_name()); let series_range = SeriesRange::TimeRange(range.to_nano_range()); let one_before_range = true; let select = EventsSubQuerySelect::new( @@ -183,7 +188,7 @@ impl TimeBinnedFromLayers { open_bytes, inp: Box::pin(inp), }; - warn!("{}::new setup from events", Self::type_name()); + debug!("{}::new setup from events", Self::type_name()); Ok(ret) } ChannelTypeConfigGen::SfDatabuffer(_) => return Err(Error::SfDatabufferNotSupported), diff --git a/crates/streams/src/timebin/gapfill.rs b/crates/streams/src/timebin/gapfill.rs index 4c80fb8..a7e4e89 100644 --- a/crates/streams/src/timebin/gapfill.rs +++ b/crates/streams/src/timebin/gapfill.rs @@ -305,7 +305,7 @@ impl GapFill { } let aa = &self.bins_for_cache_write; if aa.len() >= 2 { - for (i, (&c1, &_c2)) in aa.counts.iter().rev().zip(aa.counts.iter().rev().skip(1)).enumerate() { + for (i, (&c1, &_c2)) in aa.cnts.iter().rev().zip(aa.cnts.iter().rev().skip(1)).enumerate() { if c1 != 0 { let n = aa.len() - (1 + i); debug_cache!("{} cache_write_on_end consider {} for write", self.dbgname, n); @@ -322,7 +322,7 @@ impl GapFill { fn cache_write_intermediate(mut self: Pin<&mut Self>) -> Result<(), Error> { let aa = &self.bins_for_cache_write; if aa.len() >= 2 { - for (i, (&c1, &_c2)) in aa.counts.iter().rev().zip(aa.counts.iter().rev().skip(1)).enumerate() { + for (i, (&c1, &_c2)) in aa.cnts.iter().rev().zip(aa.cnts.iter().rev().skip(1)).enumerate() { if c1 != 0 { let n = aa.len() - (1 + i); debug_cache!("{} cache_write_intermediate consider {} for write", self.dbgname, n); @@ -462,9 +462,9 @@ impl Stream for GapFill { beg: j.ns(), end: self.range.full_range().end(), }; - warn!( - "----- RECEIVED SOMETHING, BUT NOT ALL, setup rest from finer {} {} {}", - self.range, j, range + debug!( + "{} received something but not all, setup rest from finer {} {} {}", + self.dbgname, self.range, j, range ); match self.as_mut().setup_inp_finer(range, false) { Ok(()) => { @@ -473,14 +473,14 @@ impl Stream for GapFill { Err(e) => Ready(Some(Err(::err::Error::from_string(e)))), } } else { - info!("----- RECEIVED EVERYTHING"); + debug!("{} received everything", self.dbgname); Ready(None) } } else { let range = self.range.to_nano_range(); - warn!( - "----- RECEIVED NOTHING SO FAR AT ALL, setup full range from finer {} {}", - self.range, range + debug!( + "{} received nothing at all, setup full range from finer {} {}", + self.dbgname, self.range, range ); match self.as_mut().setup_inp_finer(range, false) { Ok(()) => { @@ -495,10 +495,10 @@ impl Stream for GapFill { } else { self.done = true; if self.inp_finer_range_final_cnt == self.inp_finer_range_final_max { - trace_handle!("{} RANGE FINAL ALL", self.dbgname); + trace_handle!("{} range finale all", self.dbgname); Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) } else { - trace_handle!("{} SUBSTREAMS NOT FINAL", self.dbgname); + trace_handle!("{} substreams not final", self.dbgname); continue; } }; diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index ddd6756..2cfe4bb 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -52,7 +52,6 @@ fn assert_stream_send<'u, R>(stream: impl 'u + Send + Stream) -> impl stream } -// TODO factor out, it is use now also from GapFill. pub async fn timebinnable_stream( range: NanoRange, one_before_range: bool, @@ -92,6 +91,7 @@ pub async fn timebinnable_stream( on_sitemty_data!(k, |k| { let k: Box = Box::new(k); // trace!("got len {}", k.len()); + let k = k.to_dim0_f32_for_binning(); let k = tr.0.transform(k); Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) }) @@ -316,19 +316,24 @@ async fn timebinned_stream( events_read_provider: Option>, ) -> Result>> + Send>>, Error> { use netpod::query::CacheUsage; - match (query.cache_usage(), cache_read_provider, events_read_provider) { - (CacheUsage::Use | CacheUsage::Recreate, Some(cache_read_provider), Some(events_read_provider)) => { - let series = if let Some(x) = query.channel().series() { - x - } else { - return Err(Error::with_msg_no_trace( - "cached time binned only available given a series id", - )); - }; - info!("--- CACHING PATH ---"); - info!("{query:?}"); - info!("subgrids {:?}", query.subgrids()); - let range = binned_range.binned_range_time().to_nano_range(); + let cache_usage = query.cache_usage().unwrap_or(CacheUsage::Use); + match ( + ch_conf.series(), + cache_usage.clone(), + cache_read_provider, + events_read_provider, + ) { + ( + Some(series), + CacheUsage::Use | CacheUsage::Recreate | CacheUsage::Ignore, + Some(cache_read_provider), + Some(events_read_provider), + ) => { + debug!( + "timebinned_stream caching {:?} subgrids {:?}", + query, + query.subgrids() + ); let do_time_weight = true; let bin_len_layers = if let Some(subgrids) = query.subgrids() { subgrids @@ -336,16 +341,11 @@ async fn timebinned_stream( .map(|&x| DtMs::from_ms_u64(1000 * x.as_secs())) .collect() } else { - vec![ - DtMs::from_ms_u64(1000 * 10), - DtMs::from_ms_u64(1000 * 60 * 60), - // DtMs::from_ms_u64(1000 * 60 * 60 * 12), - // DtMs::from_ms_u64(1000 * 10), - ] + netpod::time_bin_len_cache_opts().to_vec() }; let stream = crate::timebin::TimeBinnedFromLayers::new( ch_conf, - query.cache_usage(), + cache_usage, query.transform().clone(), EventsSubQuerySettings::from(&query), query.log_level().into(), @@ -369,10 +369,8 @@ async fn timebinned_stream( } _ => { let range = binned_range.binned_range_time().to_nano_range(); - let do_time_weight = true; let one_before_range = true; - let stream = timebinnable_stream( range, one_before_range, @@ -419,7 +417,7 @@ pub async fn timebinned_json( let deadline = Instant::now() + query .timeout_content() - .unwrap_or(Duration::from_millis(5000)) + .unwrap_or(Duration::from_millis(3000)) .min(Duration::from_millis(5000)) .max(Duration::from_millis(200)); let binned_range = query.covering_range()?; @@ -439,18 +437,19 @@ pub async fn timebinned_json( let stream = timebinned_to_collectable(stream); let collected = Collect::new(stream, deadline, collect_max, bytes_max, None, Some(binned_range)); let collected: BoxFuture<_> = Box::pin(collected); - let collected = collected.await?; - info!("timebinned_json collected type_name {:?}", collected.type_name()); - let collected = if let Some(bins) = collected + let collres = collected.await?; + info!("timebinned_json collected type_name {:?}", collres.type_name()); + let collres = if let Some(bins) = collres .as_any_ref() .downcast_ref::>() { - info!("MATCHED"); - bins.boxed_collected_with_enum_fix() + warn!("unexpected binned enum"); + // bins.boxed_collected_with_enum_fix() + collres } else { - collected + collres }; - let jsval = serde_json::to_value(&collected)?; + let jsval = serde_json::to_value(&collres)?; Ok(jsval) } @@ -461,8 +460,9 @@ fn take_collector_result(coll: &mut Box) -> O .as_any_ref() .downcast_ref::>() { - info!("MATCHED ENUM"); - bins.boxed_collected_with_enum_fix() + warn!("unexpected binned enum"); + // bins.boxed_collected_with_enum_fix() + collres } else { collres };