WIP on request by pulse id

This commit is contained in:
Dominik Werder
2023-03-07 16:26:56 +01:00
parent 7bb847b93e
commit 617c21cdc3
8 changed files with 530 additions and 475 deletions

View File

@@ -69,7 +69,7 @@ pub async fn get_binned(
};
let agg_kind = AggKind::DimXBins1;
let range = NanoRange::from_date_time(beg_date, end_date);
let mut query = BinnedQuery::new(channel, range, bin_count, Some(agg_kind));
let mut query = BinnedQuery::new(channel, range, bin_count);
query.set_cache_usage(cache_usage);
query.set_disk_stats_every(ByteSize(1024 * disk_stats_every_kb));
let hp = HostPort { host: host, port: port };

View File

@@ -35,6 +35,7 @@ use items_0::TimeBinnable;
use items_0::TimeBinner;
use netpod::log::*;
use netpod::timeunits::*;
use netpod::transform::Transform;
use netpod::AggKind;
use netpod::BinnedRange;
use netpod::NanoRange;
@@ -225,11 +226,11 @@ pub trait TimeBinnableTypeAggregator: Send {
pub fn empty_events_dyn_ev(
scalar_type: &ScalarType,
shape: &Shape,
agg_kind: &AggKind,
transform: &Transform,
) -> Result<Box<dyn Events>, Error> {
let ret: Box<dyn Events> = match shape {
Shape::Scalar => match agg_kind {
AggKind::Plain | AggKind::TimeWeightedScalar => {
Shape::Scalar => match transform {
_ if true => {
use ScalarType::*;
type K<T> = eventsdim0::EventsDim0<T>;
match scalar_type {
@@ -247,14 +248,14 @@ pub fn empty_events_dyn_ev(
STRING => Box::new(K::<String>::empty()),
}
}
AggKind::PulseIdDiff => Box::new(eventsdim0::EventsDim0::<i64>::empty()),
AggKind::DimXBins1 | AggKind::DimXBinsN(..) | AggKind::EventBlobs => {
error!("TODO empty_events_dyn_ev {agg_kind:?} {scalar_type:?} {shape:?}");
_ if true => Box::new(eventsdim0::EventsDim0::<i64>::empty()),
_ => {
error!("TODO empty_events_dyn_ev {transform:?} {scalar_type:?} {shape:?}");
err::todoval()
}
},
Shape::Wave(..) => match agg_kind {
AggKind::Plain | AggKind::TimeWeightedScalar => {
Shape::Wave(..) => match transform {
_ if true => {
use ScalarType::*;
type K<T> = eventsdim1::EventsDim1<T>;
match scalar_type {
@@ -272,87 +273,23 @@ pub fn empty_events_dyn_ev(
STRING => Box::new(K::<String>::empty()),
}
}
AggKind::PulseIdDiff => Box::new(eventsdim0::EventsDim0::<i64>::empty()),
AggKind::DimXBins1 | AggKind::DimXBinsN(..) | AggKind::EventBlobs => {
error!("TODO empty_events_dyn_ev {agg_kind:?} {scalar_type:?} {shape:?}");
_ if true => Box::new(eventsdim0::EventsDim0::<i64>::empty()),
_ => {
error!("TODO empty_events_dyn_ev {transform:?} {scalar_type:?} {shape:?}");
err::todoval()
}
},
Shape::Image(..) => {
error!("TODO empty_events_dyn_ev {agg_kind:?} {scalar_type:?} {shape:?}");
error!("TODO empty_events_dyn_ev {transform:?} {scalar_type:?} {shape:?}");
err::todoval()
}
};
Ok(ret)
}
pub fn empty_binned_dyn_tb(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggKind) -> Box<dyn TimeBinnable> {
match shape {
Shape::Scalar => match agg_kind {
AggKind::TimeWeightedScalar => {
use ScalarType::*;
type K<T> = binsdim0::BinsDim0<T>;
match scalar_type {
U8 => Box::new(K::<u8>::empty()),
U16 => Box::new(K::<u16>::empty()),
U32 => Box::new(K::<u32>::empty()),
U64 => Box::new(K::<u64>::empty()),
I8 => Box::new(K::<i8>::empty()),
I16 => Box::new(K::<i16>::empty()),
I32 => Box::new(K::<i32>::empty()),
I64 => Box::new(K::<i64>::empty()),
F32 => Box::new(K::<f32>::empty()),
F64 => Box::new(K::<f64>::empty()),
BOOL | STRING => {
error!("TODO empty_binned_dyn_tb {agg_kind:?} {scalar_type:?} {shape:?}");
err::todoval()
}
}
}
AggKind::Plain
| AggKind::DimXBins1
| AggKind::DimXBinsN(..)
| AggKind::EventBlobs
| AggKind::PulseIdDiff => {
error!("TODO empty_binned_dyn_tb {agg_kind:?} {scalar_type:?} {shape:?}");
err::todoval()
}
},
Shape::Wave(_n) => match agg_kind {
AggKind::DimXBins1 => {
use ScalarType::*;
type K<T> = binsdim0::BinsDim0<T>;
match scalar_type {
U8 => Box::new(K::<u8>::empty()),
U16 => Box::new(K::<u16>::empty()),
U32 => Box::new(K::<u32>::empty()),
U64 => Box::new(K::<u64>::empty()),
I8 => Box::new(K::<i8>::empty()),
I16 => Box::new(K::<i16>::empty()),
I32 => Box::new(K::<i32>::empty()),
I64 => Box::new(K::<i64>::empty()),
F32 => Box::new(K::<f32>::empty()),
F64 => Box::new(K::<f64>::empty()),
BOOL | STRING => {
error!("TODO empty_binned_dyn_tb {agg_kind:?} {scalar_type:?} {shape:?}");
err::todoval()
}
}
}
AggKind::EventBlobs
| AggKind::DimXBinsN(..)
| AggKind::Plain
| AggKind::TimeWeightedScalar
| AggKind::PulseIdDiff => {
error!("TODO empty_binned_dyn_tb {agg_kind:?} {scalar_type:?} {shape:?}");
err::todoval()
}
},
Shape::Image(..) => {
error!("TODO empty_binned_dyn_tb {agg_kind:?} {scalar_type:?} {shape:?}");
err::todoval()
}
}
pub fn empty_binned_dyn_tb(scalar_type: &ScalarType, shape: &Shape, transform: &Transform) -> Box<dyn TimeBinnable> {
error!("TODO empty_binned_dyn_tb");
todo!()
}
fn flush_binned(
@@ -399,6 +336,7 @@ pub async fn binned_collected(
inp: Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>,
) -> Result<Box<dyn ToJsonResult>, Error> {
event!(Level::TRACE, "binned_collected");
let transform = Transform::default_time_binned();
let edges = binrange.edges();
if edges.len() < 2 {
return Err(format!("binned_collected but edges.len() {}", edges.len()).into());
@@ -414,7 +352,7 @@ pub async fn binned_collected(
let mut did_range_complete = false;
let mut coll = None;
let mut binner = None;
let empty_item = empty_events_dyn_ev(&scalar_type, &shape, &AggKind::TimeWeightedScalar)?;
let empty_item = empty_events_dyn_ev(&scalar_type, &shape, &transform)?;
let tmp_item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events(
empty_item,
))));
@@ -502,7 +440,7 @@ pub async fn binned_collected(
}
None => {
error!("binned_collected nothing collected");
let item = empty_binned_dyn_tb(&scalar_type, &shape, &AggKind::DimXBins1);
let item = empty_binned_dyn_tb(&scalar_type, &shape, &transform);
let ret = item.to_box_to_json_result();
tokio::time::sleep(Duration::from_millis(2000)).await;
Ok(ret)

View File

@@ -13,7 +13,6 @@ use futures_util::{Stream, StreamExt};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsVal;
use std::collections::{BTreeMap, VecDeque};
use std::fmt;
use std::iter::FromIterator;
use std::net::SocketAddr;
use std::path::PathBuf;
@@ -21,6 +20,7 @@ use std::pin::Pin;
use std::str::FromStr;
use std::task::{Context, Poll};
use std::time::Duration;
use std::{fmt, ops};
use timeunits::*;
use url::Url;
@@ -757,6 +757,30 @@ impl NanoRange {
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PulseRange {
pub beg: u64,
pub end: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum SeriesRange {
TimeRange(NanoRange),
PulseRange(PulseRange),
}
impl From<NanoRange> for SeriesRange {
fn from(k: NanoRange) -> Self {
Self::TimeRange(k)
}
}
impl From<PulseRange> for SeriesRange {
fn from(k: PulseRange) -> Self {
Self::PulseRange(k)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ByteOrder {
Little,
@@ -1087,13 +1111,32 @@ pub mod timeunits {
pub const DAY: u64 = HOUR * 24;
}
const BIN_T_LEN_OPTIONS_0: [u64; 3] = [
//
//SEC,
MIN * 1,
HOUR * 1,
DAY,
];
pub trait Dim0Index: Clone + fmt::Debug + ops::Add + ops::Sub + PartialOrd {
fn times(&self, x: u64) -> Self;
fn div_n(&self, n: u64) -> Self;
fn as_u64(&self) -> u64;
fn series_range(a: Self, b: Self) -> SeriesRange;
fn prebin_bin_len_opts() -> &'static [Self];
fn prebin_patch_len_for(i: usize) -> Self;
fn to_pre_binned_patch_range_enum(
bin_len: Self,
bin_count: u64,
patch_offset: u64,
patch_count: u64,
) -> PreBinnedPatchRangeEnum;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TsNano(u64);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PulseId(u64);
impl Dim0Index for TsNano {}
impl Dim0Index for PulseId {}
const PREBIN_TIME_BIN_LEN_VAR0: [TsNano; 3] = [TsNano(MIN * 1), TsNano(HOUR * 1), TsNano(DAY)];
const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 3] = [
//
@@ -1111,7 +1154,7 @@ const PATCH_T_LEN_OPTIONS_WAVE: [u64; 3] = [
DAY * 32,
];
const BIN_THRESHOLDS: [u64; 39] = [
const TIME_BIN_THRESHOLDS: [u64; 39] = [
MU,
MU * 2,
MU * 5,
@@ -1153,130 +1196,115 @@ const BIN_THRESHOLDS: [u64; 39] = [
DAY * 64,
];
#[derive(Clone, Serialize, Deserialize)]
pub struct PreBinnedPatchGridSpec {
bin_t_len: u64,
patch_t_len: u64,
const PULSE_BIN_THRESHOLDS: [u64; 10] = [
10, 20, 40, 80, 100, 200, 400, 800, 1000, 2000, 4000, 8000, 10000, 20000, 40000, 80000, 100000, 200000, 400000,
800000, 1000000, 2000000, 4000000, 8000000, 10000000,
];
const fn time_bin_threshold_at(i: usize) -> TsNano {
TsNano(TIME_BIN_THRESHOLDS[i])
}
impl PreBinnedPatchGridSpec {
pub fn new(bin_t_len: u64, patch_t_len: u64) -> Self {
if !Self::is_valid_bin_t_len(bin_t_len) {
panic!("PreBinnedPatchGridSpec invalid bin_t_len {}", bin_t_len);
const fn pulse_bin_threshold_at(i: usize) -> PulseId {
PulseId(PULSE_BIN_THRESHOLDS[i])
}
/// Identifies one patch on the binning grid at a certain resolution.
/// A patch consists of `bin_count` consecutive bins.
/// In total, a given `PreBinnedPatchCoord` spans a time range from `patch_beg` to `patch_end`.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PreBinnedPatchCoord<T>
where
T: Dim0Index,
{
bin_len: T,
bin_count: u64,
patch_offset: u64,
}
impl<T> PreBinnedPatchCoord<T>
where
T: Dim0Index,
{
pub fn new(bin_len: T, bin_count: u64, patch_offset: u64) -> Self {
Self {
bin_len,
bin_count,
patch_offset,
}
Self { bin_t_len, patch_t_len }
}
pub fn bin_len(&self) -> T {
self.bin_len
}
pub fn bin_t_len(&self) -> u64 {
self.bin_t_len
pub fn patch_len(&self) -> T {
self.bin_len().times(self.bin_count)
}
pub fn is_valid_bin_t_len(bin_t_len: u64) -> bool {
for &j in BIN_T_LEN_OPTIONS_0.iter() {
if bin_t_len == j {
return true;
}
pub fn patch_beg(&self) -> T {
self.bin_len().times(self.bin_count).times(self.patch_offset)
}
pub fn patch_end(&self) -> T {
self.bin_len().times(self.bin_count).times(1 + self.patch_offset)
}
pub fn series_range(&self) -> SeriesRange {
T::series_range(self.patch_beg(), self.patch_end())
}
pub fn bin_count(&self) -> u64 {
self.bin_count
}
pub fn patch_offset(&self) -> u64 {
self.patch_offset
}
pub fn edges(&self) -> Vec<T> {
let mut ret = Vec::new();
let mut t = self.patch_beg();
ret.push(t);
for _ in 0..self.bin_count() {
t += self.bin_t_len();
ret.push(t);
}
return false;
ret
}
pub fn patch_t_len(&self) -> u64 {
self.patch_t_len
pub fn next(&self) -> Self {
Self::new(self.bin_len, self.bin_count, 1 + self.patch_offset)
}
}
impl std::fmt::Debug for PreBinnedPatchGridSpec {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
fmt.debug_struct("PreBinnedPatchGridSpec")
.field("bin_t_len", &(self.bin_t_len / SEC))
.field("patch_t_len", &(self.patch_t_len() / SEC))
.finish_non_exhaustive()
impl<T> AppendToUrl for PreBinnedPatchCoord<T>
where
T: Dim0Index,
{
fn append_to_url(&self, url: &mut Url) {
error!("TODO AppendToUrl for PreBinnedPatchCoord");
err::todo();
// TODO must also emit the type of the series index
let mut g = url.query_pairs_mut();
g.append_pair("patchTlen", &format!("{}", 4242));
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PreBinnedPatchRange {
pub grid_spec: PreBinnedPatchGridSpec,
pub offset: u64,
pub count: u64,
pub struct PreBinnedPatchRange<T>
where
T: Dim0Index,
{
first: PreBinnedPatchCoord<T>,
patch_count: u64,
}
fn get_patch_t_len(bin_t_len: u64) -> u64 {
// TODO mechanism to select different patch lengths for different channels.
let shape = Shape::Scalar;
match shape {
Shape::Scalar => {
for (i1, &j) in BIN_T_LEN_OPTIONS_0.iter().enumerate() {
if bin_t_len == j {
return PATCH_T_LEN_OPTIONS_SCALAR[i1];
}
}
}
Shape::Wave(..) => {
for (i1, &j) in BIN_T_LEN_OPTIONS_0.iter().enumerate() {
if bin_t_len == j {
return PATCH_T_LEN_OPTIONS_WAVE[i1];
}
}
}
Shape::Image(..) => {
for (i1, &j) in BIN_T_LEN_OPTIONS_0.iter().enumerate() {
if bin_t_len == j {
return PATCH_T_LEN_OPTIONS_WAVE[i1];
}
}
}
}
panic!()
}
impl PreBinnedPatchRange {
/// Cover at least the given range with at least as many as the requested number of bins.
pub fn covering_range(range: NanoRange, min_bin_count: u32) -> Result<Option<Self>, Error> {
let bin_t_len_options = &BIN_T_LEN_OPTIONS_0;
if min_bin_count < 1 {
Err(Error::with_msg("min_bin_count < 1"))?;
}
if min_bin_count > 20000 {
Err(Error::with_msg(format!("min_bin_count > 20000: {}", min_bin_count)))?;
}
let dt = range.delta();
if dt > DAY * 200 {
Err(Error::with_msg("dt > DAY * 200"))?;
}
let bs = dt / min_bin_count as u64;
let mut i1 = bin_t_len_options.len();
loop {
if i1 == 0 {
break Ok(None);
} else {
i1 -= 1;
let t = bin_t_len_options[i1];
if t <= bs {
let bin_t_len = t;
let patch_t_len = get_patch_t_len(bin_t_len);
if !PreBinnedPatchGridSpec::is_valid_bin_t_len(bin_t_len) {
return Err(Error::with_msg_no_trace(format!("not a valid bin_t_len {}", bin_t_len)));
}
let grid_spec = PreBinnedPatchGridSpec { bin_t_len, patch_t_len };
let pl = patch_t_len;
let ts1 = range.beg / pl * pl;
let ts2 = (range.end + pl - 1) / pl * pl;
let count = (ts2 - ts1) / pl;
let offset = ts1 / pl;
let ret = Self {
grid_spec,
count,
offset,
};
break Ok(Some(ret));
}
}
}
}
impl<T> PreBinnedPatchRange<T>
where
T: Dim0Index,
{
pub fn edges(&self) -> Vec<u64> {
let mut ret = vec![];
let mut ret = Vec::new();
let mut t = self.grid_spec.patch_t_len() * self.offset;
ret.push(t);
let bin_count = self.grid_spec.patch_t_len() / self.grid_spec.bin_t_len() * self.count;
@@ -1309,152 +1337,75 @@ impl PreBinnedPatchRange {
}
}
/// Identifies one patch on the binning grid at a certain resolution.
/// A patch consists of `bin_count` consecutive bins.
/// In total, a given `PreBinnedPatchCoord` spans a time range from `patch_beg` to `patch_end`.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PreBinnedPatchCoord {
spec: PreBinnedPatchGridSpec,
ix: u64,
pub enum PreBinnedPatchRangeEnum {
Time(PreBinnedPatchRange<TsNano>),
Pulse(PreBinnedPatchRange<PulseId>),
}
impl PreBinnedPatchCoord {
pub fn bin_t_len(&self) -> u64 {
self.spec.bin_t_len
}
pub fn patch_t_len(&self) -> u64 {
self.spec.patch_t_len()
}
pub fn patch_beg(&self) -> u64 {
self.spec.patch_t_len() * self.ix
}
pub fn patch_end(&self) -> u64 {
self.spec.patch_t_len() * (self.ix + 1)
}
pub fn patch_range(&self) -> NanoRange {
NanoRange {
beg: self.patch_beg(),
end: self.patch_end(),
impl PreBinnedPatchRangeEnum {
fn covering_range_ty<T>(a: T, b: T, min_bin_count: u32) -> Result<Self, Error>
where
T: Dim0Index,
{
let opts = T::prebin_bin_len_opts();
if min_bin_count < 1 {
Err(Error::with_msg("min_bin_count < 1"))?;
}
}
pub fn bin_count(&self) -> u32 {
(self.spec.patch_t_len() / self.spec.bin_t_len) as u32
}
pub fn spec(&self) -> &PreBinnedPatchGridSpec {
&self.spec
}
pub fn ix(&self) -> u64 {
self.ix
}
pub fn new(bin_t_len: u64, patch_t_len: u64, patch_ix: u64) -> Self {
Self {
spec: PreBinnedPatchGridSpec::new(bin_t_len, patch_t_len),
ix: patch_ix,
if min_bin_count > 20000 {
Err(Error::with_msg(format!("min_bin_count > 20000: {}", min_bin_count)))?;
}
}
pub fn edges(&self) -> Vec<u64> {
let mut ret = vec![];
let mut t = self.patch_beg();
ret.push(t);
for _ in 0..self.bin_count() {
t += self.bin_t_len();
ret.push(t);
}
ret
}
}
impl AppendToUrl for PreBinnedPatchCoord {
fn append_to_url(&self, url: &mut Url) {
let mut g = url.query_pairs_mut();
g.append_pair("patchTlen", &format!("{}", self.spec.patch_t_len() / SEC));
g.append_pair("binTlen", &format!("{}", self.spec.bin_t_len() / SEC));
g.append_pair("patchIx", &format!("{}", self.ix()));
}
}
pub struct PreBinnedPatchIterator {
range: PreBinnedPatchRange,
ix: u64,
}
impl PreBinnedPatchIterator {
pub fn from_range(range: PreBinnedPatchRange) -> Self {
Self { range, ix: 0 }
}
}
impl Iterator for PreBinnedPatchIterator {
type Item = PreBinnedPatchCoord;
fn next(&mut self) -> Option<Self::Item> {
if self.ix >= self.range.count {
None
} else {
let ret = Self::Item {
spec: self.range.grid_spec.clone(),
ix: self.range.offset + self.ix,
};
self.ix += 1;
Some(ret)
}
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct BinnedGridSpec {
bin_t_len: u64,
}
impl BinnedGridSpec {
pub fn new(bin_t_len: u64) -> Self {
if !Self::is_valid_bin_t_len(bin_t_len) {
panic!("BinnedGridSpec::new invalid bin_t_len {}", bin_t_len);
}
Self { bin_t_len }
}
pub fn bin_t_len(&self) -> u64 {
self.bin_t_len
}
pub fn is_valid_bin_t_len(bin_t_len: u64) -> bool {
for &j in BIN_T_LEN_OPTIONS_0.iter() {
if bin_t_len == j {
return true;
let du = b - a;
let max_bin_len = du.div_n(min_bin_count);
for (i1, bl) in opts.enumerate().rev() {
if bl <= du {
let patch_len = bl.prebin_patch_len_for(i1);
let bin_count = patch_len.div_v(bl);
let patch_off_1 = a.div_v(&patch_len);
let patch_off_2 = b.div_v(&patch_len.add(patch_len).sub(1));
//patch_off_2.sub(patch_off_1);
let patch_count = patch_off_2 - patch_off_1;
let ret = T::to_pre_binned_patch_range_enum(bl, bin_count, patch_off_1, patch_count);
return Ok(ret);
}
}
return false;
Err(Error::with_msg_no_trace("can not find matching pre-binned grid"))
}
}
impl fmt::Debug for BinnedGridSpec {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
if self.bin_t_len < SEC * 90 {
write!(fmt, "BinnedGridSpec {{ bin_t_len: {:?} ms }}", self.bin_t_len / MS,)
} else {
write!(fmt, "BinnedGridSpec {{ bin_t_len: {:?} s }}", self.bin_t_len / SEC,)
/// Cover at least the given range with at least as many as the requested number of bins.
pub fn covering_range(range: SeriesRange, min_bin_count: u32) -> Result<Self, Error> {
match range {
SeriesRange::TimeRange(k) => Self::covering_range_ty(TsNano(k.beg), TsNano(k.end), min_bin_count),
SeriesRange::PulseRange(k) => Self::covering_range_ty(PulseId(k.beg), PulseId(k.end), min_bin_count),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BinnedRange {
grid_spec: BinnedGridSpec,
offset: u64,
bin_count: u64,
pub struct BinnedRange<T>
where
T: Dim0Index,
{
bin_len: T,
bin_off: u64,
bin_cnt: u64,
}
impl BinnedRange {
impl<T> fmt::Debug for BinnedRange<T>
where
T: Dim0Index,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("BinnedRange")
.field("bin_len", &self.bin_len)
.field("bin_off", &self.bin_off)
.field("bin_cnt", &self.bin_cnt)
}
}
impl<T> BinnedRange<T>
where
T: Dim0Index,
{
pub fn covering_range(range: NanoRange, min_bin_count: u32) -> Result<Self, Error> {
let thresholds = &BIN_THRESHOLDS;
if min_bin_count < 1 {
@@ -1528,6 +1479,49 @@ impl BinnedRange {
}
}
pub enum BinnedRangeEnum {
Time(PreBinnedPatchRange<TsNano>),
Pulse(PreBinnedPatchRange<PulseId>),
}
impl BinnedRangeEnum {
fn covering_range_ty<T>(a: T, b: T, min_bin_count: u32) -> Result<Self, Error>
where
T: Dim0Index,
{
let opts = T::prebin_bin_len_opts();
if min_bin_count < 1 {
Err(Error::with_msg("min_bin_count < 1"))?;
}
if min_bin_count > 20000 {
Err(Error::with_msg(format!("min_bin_count > 20000: {}", min_bin_count)))?;
}
let du = b - a;
let max_bin_len = du.div_n(min_bin_count);
for (i1, bl) in opts.enumerate().rev() {
if bl <= du {
let patch_len = bl.prebin_patch_len_for(i1);
let bin_count = patch_len.div_v(bl);
let patch_off_1 = a.div_v(&patch_len);
let patch_off_2 = b.div_v(&patch_len.add(patch_len).sub(1));
//patch_off_2.sub(patch_off_1);
let patch_count = patch_off_2 - patch_off_1;
let ret = T::to_pre_binned_patch_range_enum(bl, bin_count, patch_off_1, patch_count);
return Ok(ret);
}
}
Err(Error::with_msg_no_trace("can not find matching pre-binned grid"))
}
/// Cover at least the given range with at least as many as the requested number of bins.
pub fn covering_range(range: SeriesRange, min_bin_count: u32) -> Result<Self, Error> {
match range {
SeriesRange::TimeRange(k) => Self::covering_range_ty(TsNano(k.beg), TsNano(k.end), min_bin_count),
SeriesRange::PulseRange(k) => Self::covering_range_ty(PulseId(k.beg), PulseId(k.end), min_bin_count),
}
}
}
#[cfg(test)]
mod test_binned_range {
use super::*;

View File

@@ -14,6 +14,8 @@ use crate::FromUrl;
use crate::HasBackend;
use crate::HasTimeout;
use crate::NanoRange;
use crate::PulseRange;
use crate::SeriesRange;
use crate::ToNanos;
use chrono::DateTime;
use chrono::TimeZone;
@@ -81,14 +83,134 @@ impl fmt::Display for CacheUsage {
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TimeRangeQuery {
range: NanoRange,
}
impl FromUrl for TimeRangeQuery {
fn from_url(url: &Url) -> Result<Self, Error> {
let pairs = get_url_query_pairs(url);
Self::from_pairs(&pairs)
}
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Error> {
if let (Some(beg), Some(end)) = (pairs.get("begDate"), pairs.get("endDate")) {
let ret = Self {
range: NanoRange {
beg: beg.parse::<DateTime<Utc>>()?.to_nanos(),
end: end.parse::<DateTime<Utc>>()?.to_nanos(),
},
};
Ok(ret)
} else if let (Some(beg), Some(end)) = (pairs.get("begNs"), pairs.get("endNs")) {
let ret = Self {
range: NanoRange {
beg: beg.parse()?,
end: end.parse()?,
},
};
Ok(ret)
} else {
Err(Error::with_public_msg("missing date range"))
}
}
}
impl AppendToUrl for TimeRangeQuery {
fn append_to_url(&self, url: &mut Url) {
let date_fmt = "%Y-%m-%dT%H:%M:%S.%6fZ";
let mut g = url.query_pairs_mut();
g.append_pair(
"begDate",
&Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(),
);
g.append_pair(
"endDate",
&Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(),
);
}
}
impl From<TimeRangeQuery> for NanoRange {
fn from(k: TimeRangeQuery) -> Self {
Self {
beg: k.range.beg,
end: k.range.end,
}
}
}
impl From<&NanoRange> for TimeRangeQuery {
fn from(k: &NanoRange) -> Self {
Self {
range: NanoRange { beg: k.beg, end: k.end },
}
}
}
impl From<&PulseRange> for PulseRangeQuery {
fn from(k: &PulseRange) -> Self {
Self {
range: PulseRange { beg: k.beg, end: k.end },
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PulseRangeQuery {
range: PulseRange,
}
impl FromUrl for PulseRangeQuery {
fn from_url(url: &Url) -> Result<Self, Error> {
let pairs = get_url_query_pairs(url);
Self::from_pairs(&pairs)
}
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Error> {
if let (Some(beg), Some(end)) = (pairs.get("begPulse"), pairs.get("endPulse")) {
let ret = Self {
range: PulseRange {
beg: beg.parse()?,
end: end.parse()?,
},
};
Ok(ret)
} else {
Err(Error::with_public_msg("missing pulse range"))
}
}
}
impl AppendToUrl for PulseRangeQuery {
fn append_to_url(&self, url: &mut Url) {
let mut g = url.query_pairs_mut();
g.append_pair("begPulse", &self.range.beg.to_string());
g.append_pair("endPulse", &self.range.end.to_string());
}
}
impl From<PulseRangeQuery> for PulseRange {
fn from(k: PulseRangeQuery) -> Self {
Self {
beg: k.range.beg,
end: k.range.end,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PlainEventsQuery {
channel: Channel,
range: NanoRange,
#[serde(default, skip_serializing_if = "Option::is_none")]
agg_kind: Option<AggKind>,
#[serde(default, skip_serializing_if = "Option::is_none")]
transform: Option<Transform>,
range: SeriesRange,
#[serde(default, skip_serializing_if = "is_false", rename = "oneBeforeRange")]
one_before_range: bool,
#[serde(
default = "Transform::default_events",
skip_serializing_if = "Transform::is_default_events"
)]
transform: Transform,
#[serde(default, skip_serializing_if = "Option::is_none", with = "humantime_serde")]
timeout: Option<Duration>,
#[serde(default, skip_serializing_if = "Option::is_none")]
@@ -106,20 +228,17 @@ pub struct PlainEventsQuery {
}
impl PlainEventsQuery {
pub fn new(
channel: Channel,
range: NanoRange,
agg_kind: Option<AggKind>,
timeout: Option<Duration>,
events_max: Option<u64>,
) -> Self {
pub fn new<R>(channel: Channel, range: R) -> Self
where
R: Into<SeriesRange>,
{
Self {
channel,
range,
agg_kind,
transform: None,
timeout,
events_max,
range: range.into(),
one_before_range: false,
transform: Transform::default_events(),
timeout: Some(Duration::from_millis(4000)),
events_max: Some(10000),
event_delay: None,
stream_batch_len: None,
buf_len_disk_io: None,
@@ -132,23 +251,16 @@ impl PlainEventsQuery {
&self.channel
}
pub fn range(&self) -> &NanoRange {
pub fn range(&self) -> &SeriesRange {
&self.range
}
pub fn agg_kind(&self) -> &Option<AggKind> {
&self.agg_kind
}
pub fn agg_kind_value(&self) -> AggKind {
self.agg_kind.as_ref().map_or(AggKind::Plain, |x| x.clone())
}
pub fn one_before_range(&self) -> bool {
match &self.agg_kind {
Some(k) => k.need_expand(),
None => false,
}
self.one_before_range
}
pub fn transform(&self) -> &Transform {
&self.transform
}
pub fn buf_len_disk_io(&self) -> usize {
@@ -206,17 +318,19 @@ impl FromUrl for PlainEventsQuery {
Self::from_pairs(&pairs)
}
fn from_pairs(pairs: &std::collections::BTreeMap<String, String>) -> Result<Self, Error> {
let beg_date = pairs.get("begDate").ok_or(Error::with_public_msg("missing begDate"))?;
let end_date = pairs.get("endDate").ok_or(Error::with_public_msg("missing endDate"))?;
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Error> {
let range = if let Ok(x) = TimeRangeQuery::from_pairs(pairs) {
SeriesRange::TimeRange(x.into())
} else if let Ok(x) = PulseRangeQuery::from_pairs(pairs) {
SeriesRange::PulseRange(x.into())
} else {
return Err(Error::with_msg_no_trace("no series range in url"));
};
let ret = Self {
channel: Channel::from_pairs(pairs)?,
range: NanoRange {
beg: beg_date.parse::<DateTime<Utc>>()?.to_nanos(),
end: end_date.parse::<DateTime<Utc>>()?.to_nanos(),
},
agg_kind: agg_kind_from_binning_scheme(pairs)?,
transform: Some(Transform::from_pairs(pairs)?),
range,
one_before_range: pairs.get("oneBeforeRange").map_or("false", |x| x.as_ref()) == "true",
transform: Transform::from_pairs(pairs)?,
timeout: pairs
.get("timeout")
.map(|x| x.parse::<u64>().map(Duration::from_millis).ok())
@@ -250,23 +364,19 @@ impl FromUrl for PlainEventsQuery {
impl AppendToUrl for PlainEventsQuery {
fn append_to_url(&self, url: &mut Url) {
let date_fmt = "%Y-%m-%dT%H:%M:%S.%6fZ";
match &self.range {
SeriesRange::TimeRange(k) => TimeRangeQuery::from(k).append_to_url(url),
SeriesRange::PulseRange(_) => todo!(),
}
self.channel.append_to_url(url);
if let Some(x) = &self.transform {
x.append_to_url(url);
}
if let Some(x) = &self.agg_kind {
binning_scheme_append_to_url(x, url);
{
let mut g = url.query_pairs_mut();
if self.one_before_range() {
g.append_pair("oneBeforeRange", "true");
}
}
self.transform.append_to_url(url);
let mut g = url.query_pairs_mut();
g.append_pair(
"begDate",
&Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(),
);
g.append_pair(
"endDate",
&Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(),
);
if let Some(x) = &self.timeout {
g.append_pair("timeout", &format!("{}", x.as_millis()));
}
@@ -294,10 +404,13 @@ impl AppendToUrl for PlainEventsQuery {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BinnedQuery {
channel: Channel,
range: NanoRange,
range: SeriesRange,
bin_count: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
agg_kind: Option<AggKind>,
#[serde(
default = "Transform::default_time_binned",
skip_serializing_if = "Transform::is_default_time_binned"
)]
transform: Transform,
#[serde(default, skip_serializing_if = "Option::is_none")]
cache_usage: Option<CacheUsage>,
#[serde(default, skip_serializing_if = "Option::is_none")]
@@ -311,12 +424,12 @@ pub struct BinnedQuery {
}
impl BinnedQuery {
pub fn new(channel: Channel, range: NanoRange, bin_count: u32, agg_kind: Option<AggKind>) -> Self {
pub fn new(channel: Channel, range: SeriesRange, bin_count: u32) -> Self {
Self {
channel,
range,
bin_count,
agg_kind,
transform: Transform::default_time_binned(),
cache_usage: None,
bins_max: None,
buf_len_disk_io: None,
@@ -325,7 +438,7 @@ impl BinnedQuery {
}
}
pub fn range(&self) -> &NanoRange {
pub fn range(&self) -> &SeriesRange {
&self.range
}
@@ -337,11 +450,8 @@ impl BinnedQuery {
self.bin_count
}
pub fn agg_kind(&self) -> AggKind {
match &self.agg_kind {
Some(x) => x.clone(),
None => AggKind::TimeWeightedScalar,
}
pub fn transform(&self) -> &Transform {
&self.transform
}
pub fn cache_usage(&self) -> CacheUsage {
@@ -421,20 +531,22 @@ impl FromUrl for BinnedQuery {
}
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Error> {
let beg_date = pairs.get("begDate").ok_or(Error::with_msg("missing begDate"))?;
let end_date = pairs.get("endDate").ok_or(Error::with_msg("missing endDate"))?;
let range = if let Ok(x) = TimeRangeQuery::from_pairs(pairs) {
SeriesRange::TimeRange(x.into())
} else if let Ok(x) = PulseRangeQuery::from_pairs(pairs) {
SeriesRange::PulseRange(x.into())
} else {
return Err(Error::with_msg_no_trace("no series range in url"));
};
let ret = Self {
channel: Channel::from_pairs(&pairs)?,
range: NanoRange {
beg: beg_date.parse::<DateTime<Utc>>()?.to_nanos(),
end: end_date.parse::<DateTime<Utc>>()?.to_nanos(),
},
range,
bin_count: pairs
.get("binCount")
.ok_or(Error::with_msg("missing binCount"))?
.parse()
.map_err(|e| Error::with_msg(format!("can not parse binCount {:?}", e)))?,
agg_kind: agg_kind_from_binning_scheme(&pairs)?,
transform: Transform::from_pairs(pairs)?,
cache_usage: CacheUsage::from_pairs(&pairs)?,
buf_len_disk_io: pairs
.get("bufLenDiskIo")
@@ -462,40 +574,31 @@ impl FromUrl for BinnedQuery {
impl AppendToUrl for BinnedQuery {
fn append_to_url(&self, url: &mut Url) {
let date_fmt = "%Y-%m-%dT%H:%M:%S.%6fZ";
match &self.range {
SeriesRange::TimeRange(k) => TimeRangeQuery::from(k).append_to_url(url),
SeriesRange::PulseRange(k) => PulseRangeQuery::from(k).append_to_url(url),
}
self.channel.append_to_url(url);
{
self.channel.append_to_url(url);
let mut g = url.query_pairs_mut();
if let Some(x) = &self.cache_usage {
g.append_pair("cacheUsage", &x.query_param_value());
}
g.append_pair(
"begDate",
&Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(),
);
g.append_pair(
"endDate",
&Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(),
);
g.append_pair("binCount", &format!("{}", self.bin_count));
}
if let Some(x) = &self.agg_kind {
binning_scheme_append_to_url(x, url);
self.transform.append_to_url(url);
let mut g = url.query_pairs_mut();
if let Some(x) = &self.cache_usage {
g.append_pair("cacheUsage", &x.query_param_value());
}
{
let mut g = url.query_pairs_mut();
if let Some(x) = &self.timeout {
g.append_pair("timeout", &format!("{}", x.as_millis()));
}
if let Some(x) = self.bins_max {
g.append_pair("binsMax", &format!("{}", x));
}
if let Some(x) = self.buf_len_disk_io {
g.append_pair("bufLenDiskIo", &format!("{}", x));
}
if let Some(x) = &self.disk_stats_every {
g.append_pair("diskStatsEveryKb", &format!("{}", x.bytes() / 1024));
}
if let Some(x) = &self.timeout {
g.append_pair("timeout", &format!("{}", x.as_millis()));
}
if let Some(x) = self.bins_max {
g.append_pair("binsMax", &format!("{}", x));
}
if let Some(x) = self.buf_len_disk_io {
g.append_pair("bufLenDiskIo", &format!("{}", x));
}
if let Some(x) = &self.disk_stats_every {
g.append_pair("diskStatsEveryKb", &format!("{}", x.bytes() / 1024));
}
}
}

View File

@@ -1,16 +1,14 @@
use crate::get_url_query_pairs;
use crate::log::*;
use crate::AppendToUrl;
use crate::BinnedRange;
use crate::FromUrl;
use crate::NanoRange;
use err::Error;
use serde::Deserialize;
use serde::Serialize;
use std::collections::BTreeMap;
use url::Url;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum EventTransform {
EventBlobsVerbatim,
EventBlobsUncompressed,
@@ -20,14 +18,14 @@ pub enum EventTransform {
PulseIdDiff,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum TimeBinningTransform {
None,
TimeWeighted(BinnedRange),
Unweighted(BinnedRange),
TimeWeighted,
Unweighted,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Transform {
event: EventTransform,
time_binning: TimeBinningTransform,
@@ -37,6 +35,28 @@ impl Transform {
fn url_prefix() -> &'static str {
"transform"
}
pub fn default_events() -> Self {
Self {
event: EventTransform::ValueFull,
time_binning: TimeBinningTransform::None,
}
}
pub fn default_time_binned() -> Self {
Self {
event: EventTransform::MinMaxAvgDev,
time_binning: TimeBinningTransform::TimeWeighted,
}
}
pub fn is_default_events(&self) -> bool {
self == &Self::default_events()
}
pub fn is_default_time_binned(&self) -> bool {
self == &Self::default_time_binned()
}
}
impl FromUrl for Transform {
@@ -62,13 +82,7 @@ impl FromUrl for Transform {
} else if s == "timeWeightedScalar" {
Transform {
event: EventTransform::MinMaxAvgDev,
time_binning: TimeBinningTransform::TimeWeighted(BinnedRange::covering_range(
NanoRange {
beg: 20000000000,
end: 30000000000,
},
20,
)?),
time_binning: TimeBinningTransform::TimeWeighted,
}
} else if s == "unweightedScalar" {
Transform {

View File

@@ -13,6 +13,7 @@ use netpod::log::*;
use netpod::query::CacheUsage;
use netpod::query::PlainEventsQuery;
use netpod::timeunits::*;
use netpod::transform::Transform;
use netpod::AggKind;
use netpod::ChannelTyped;
use netpod::PreBinnedPatchCoord;
@@ -33,7 +34,6 @@ pub async fn read_cached_scylla(
series: u64,
chn: &ChannelTyped,
coord: &PreBinnedPatchCoord,
_agg_kind: AggKind,
scy: &ScySession,
) -> Result<Option<Box<dyn TimeBinned>>, Error> {
let vals = (
@@ -206,7 +206,8 @@ pub async fn fetch_uncached_data(
series: u64,
chn: ChannelTyped,
coord: PreBinnedPatchCoord,
agg_kind: AggKind,
one_before_range: bool,
transform: Transform,
cache_usage: CacheUsage,
scy: Arc<ScySession>,
) -> Result<Option<(Box<dyn TimeBinned>, bool)>, Error> {
@@ -226,13 +227,16 @@ pub async fn fetch_uncached_data(
&chn,
coord.clone(),
range,
agg_kind,
one_before_range,
transform,
cache_usage.clone(),
scy.clone(),
)
.await
}
Ok(None) => fetch_uncached_binned_events(series, &chn, coord.clone(), agg_kind, scy.clone()).await,
Ok(None) => {
fetch_uncached_binned_events(series, &chn, coord.clone(), one_before_range, transform, scy.clone()).await
}
Err(e) => Err(e),
}?;
if true || complete {
@@ -265,7 +269,8 @@ pub fn fetch_uncached_data_box(
series: u64,
chn: &ChannelTyped,
coord: &PreBinnedPatchCoord,
agg_kind: AggKind,
one_before_range: bool,
transform: Transform,
cache_usage: CacheUsage,
scy: Arc<ScySession>,
) -> Pin<Box<dyn Future<Output = Result<Option<(Box<dyn TimeBinned>, bool)>, Error>> + Send>> {
@@ -273,7 +278,8 @@ pub fn fetch_uncached_data_box(
series,
chn.clone(),
coord.clone(),
agg_kind,
one_before_range,
transform,
cache_usage,
scy,
))
@@ -284,7 +290,8 @@ pub async fn fetch_uncached_higher_res_prebinned(
chn: &ChannelTyped,
coord: PreBinnedPatchCoord,
range: PreBinnedPatchRange,
agg_kind: AggKind,
one_before_range: bool,
transform: Transform,
cache_usage: CacheUsage,
scy: Arc<ScySession>,
) -> Result<(Box<dyn TimeBinned>, bool), Error> {
@@ -292,7 +299,7 @@ pub async fn fetch_uncached_higher_res_prebinned(
// TODO refine the AggKind scheme or introduce a new BinningOpts type and get time-weight from there.
let do_time_weight = true;
// We must produce some result with correct types even if upstream delivers nothing at all.
let bin0 = empty_binned_dyn_tb(&chn.scalar_type, &chn.shape, &agg_kind);
let bin0 = empty_binned_dyn_tb(&chn.scalar_type, &chn.shape, &transform);
let mut time_binner = bin0.time_binner_new(edges.clone(), do_time_weight);
let mut complete = true;
let patch_it = PreBinnedPatchIterator::from_range(range.clone());
@@ -304,7 +311,8 @@ pub async fn fetch_uncached_higher_res_prebinned(
series,
chn,
&patch_coord,
agg_kind.clone(),
one_before_range,
transform.clone(),
cache_usage.clone(),
scy.clone(),
)
@@ -358,7 +366,8 @@ pub async fn fetch_uncached_binned_events(
series: u64,
chn: &ChannelTyped,
coord: PreBinnedPatchCoord,
agg_kind: AggKind,
one_before_range: bool,
transform: Transform,
scy: Arc<ScySession>,
) -> Result<(Box<dyn TimeBinned>, bool), Error> {
let edges = coord.edges();
@@ -367,7 +376,7 @@ pub async fn fetch_uncached_binned_events(
// We must produce some result with correct types even if upstream delivers nothing at all.
//let bin0 = empty_events_dyn_tb(&chn.scalar_type, &chn.shape, &agg_kind);
//let mut time_binner = bin0.time_binner_new(edges.clone(), do_time_weight);
let mut time_binner = empty_events_dyn_ev(&chn.scalar_type, &chn.shape, &agg_kind)?
let mut time_binner = empty_events_dyn_ev(&chn.scalar_type, &chn.shape, &transform)?
.as_time_binnable()
.time_binner_new(edges.clone(), do_time_weight);
// TODO handle deadline better
@@ -376,19 +385,11 @@ pub async fn fetch_uncached_binned_events(
let deadline = deadline
.checked_add(Duration::from_millis(6000))
.ok_or_else(|| Error::with_msg_no_trace(format!("deadline overflow")))?;
let do_one_before_range = agg_kind.need_expand();
let evq = PlainEventsQuery::new(
chn.channel.clone(),
coord.patch_range(),
Some(agg_kind),
// TODO take from query
Some(Duration::from_millis(8000)),
None,
);
let evq = PlainEventsQuery::new(chn.channel.clone(), coord.patch_range());
let mut events_dyn = EventsStreamScylla::new(
series,
evq.range().clone(),
do_one_before_range,
one_before_range,
chn.scalar_type.clone(),
chn.shape.clone(),
true,
@@ -466,22 +467,20 @@ pub async fn pre_binned_value_stream_with_scy(
series: u64,
chn: &ChannelTyped,
coord: &PreBinnedPatchCoord,
agg_kind: AggKind,
one_before_range: bool,
transform: Transform,
cache_usage: CacheUsage,
scy: Arc<ScySession>,
) -> Result<(Box<dyn TimeBinned>, bool), Error> {
trace!("pre_binned_value_stream_with_scy {chn:?} {coord:?}");
if let (Some(item), CacheUsage::Use) = (
read_cached_scylla(series, chn, coord, agg_kind.clone(), &scy).await?,
&cache_usage,
) {
if let (Some(item), CacheUsage::Use) = (read_cached_scylla(series, chn, coord, &scy).await?, &cache_usage) {
info!("+++++++++++++ GOOD READ");
Ok((item, true))
} else {
if let CacheUsage::Use = &cache_usage {
warn!("--+--+--+--+--+--+ NOT YET CACHED");
}
let res = fetch_uncached_data_box(series, chn, coord, agg_kind, cache_usage, scy).await?;
let res = fetch_uncached_data_box(series, chn, coord, one_before_range, transform, cache_usage, scy).await?;
let (bin, complete) =
res.ok_or_else(|| Error::with_msg_no_trace(format!("pre_binned_value_stream_with_scy got None bin")))?;
Ok((bin, complete))
@@ -497,7 +496,8 @@ pub async fn pre_binned_value_stream(
scy: Arc<ScySession>,
) -> Result<Pin<Box<dyn Stream<Item = Result<Box<dyn TimeBinned>, Error>> + Send>>, Error> {
trace!("pre_binned_value_stream series {series} {chn:?} {coord:?}");
let res = pre_binned_value_stream_with_scy(series, chn, coord, agg_kind, cache_usage, scy).await?;
let res =
pre_binned_value_stream_with_scy(series, chn, coord, one_before_range, transform, cache_usage, scy).await?;
error!("TODO pre_binned_value_stream");
err::todo();
Ok(Box::pin(futures_util::stream::iter([Ok(res.0)])))

View File

@@ -1,4 +1,5 @@
use crate::errconv::ErrConv;
use crate::ScyllaSeriesRange;
use err::Error;
use futures_util::Future;
use futures_util::FutureExt;
@@ -12,7 +13,6 @@ use items_2::channelevents::ChannelEvents;
use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim1::EventsDim1;
use netpod::log::*;
use netpod::NanoRange;
use netpod::ScalarType;
use netpod::Shape;
use scylla::Session as ScySession;
@@ -25,7 +25,7 @@ use std::task::Poll;
async fn find_ts_msp(
series: u64,
range: NanoRange,
range: ScyllaSeriesRange,
scy: Arc<ScySession>,
) -> Result<(VecDeque<u64>, VecDeque<u64>), Error> {
trace!("find_ts_msp series {} {:?}", series, range);
@@ -131,7 +131,7 @@ impl_scaty_array!(Vec<bool>, bool, Vec<bool>, "events_array_bool");
struct ReadNextValuesOpts {
series: u64,
ts_msp: u64,
range: NanoRange,
range: ScyllaSeriesRange,
fwd: bool,
with_values: bool,
scy: Arc<ScySession>,
@@ -277,7 +277,7 @@ struct ReadValues {
series: u64,
scalar_type: ScalarType,
shape: Shape,
range: NanoRange,
range: ScyllaSeriesRange,
ts_msps: VecDeque<u64>,
fwd: bool,
with_values: bool,
@@ -291,7 +291,7 @@ impl ReadValues {
series: u64,
scalar_type: ScalarType,
shape: Shape,
range: NanoRange,
range: ScyllaSeriesRange,
ts_msps: VecDeque<u64>,
fwd: bool,
with_values: bool,
@@ -397,7 +397,7 @@ pub struct EventsStreamScylla {
series: u64,
scalar_type: ScalarType,
shape: Shape,
range: NanoRange,
range: ScyllaSeriesRange,
do_one_before_range: bool,
ts_msp_bck: VecDeque<u64>,
ts_msp_fwd: VecDeque<u64>,
@@ -411,7 +411,7 @@ pub struct EventsStreamScylla {
impl EventsStreamScylla {
pub fn new(
series: u64,
range: NanoRange,
range: ScyllaSeriesRange,
do_one_before_range: bool,
scalar_type: ScalarType,
shape: Shape,

View File

@@ -11,6 +11,12 @@ use scylla::statement::Consistency;
use scylla::Session as ScySession;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct ScyllaSeriesRange {
beg: u64,
end: u64,
}
pub async fn create_scy_session(scyconf: &ScyllaConfig) -> Result<Arc<ScySession>, Error> {
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyconf.hosts)