WIP streams crate checks
This commit is contained in:
+24
-18
@@ -2,24 +2,30 @@ use chrono::Utc;
|
|||||||
use err::Error;
|
use err::Error;
|
||||||
use netpod::log::*;
|
use netpod::log::*;
|
||||||
use netpod::timeunits::SEC;
|
use netpod::timeunits::SEC;
|
||||||
use netpod::{AggKind, Channel, Cluster, NodeConfigCached, PreBinnedPatchCoord};
|
use netpod::AggKind;
|
||||||
use serde::{Deserialize, Serialize};
|
use netpod::Channel;
|
||||||
|
use netpod::Cluster;
|
||||||
|
use netpod::NodeConfigCached;
|
||||||
|
use netpod::PreBinnedPatchCoordEnum;
|
||||||
|
use serde::Deserialize;
|
||||||
|
use serde::Serialize;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::Duration;
|
||||||
|
use std::time::Instant;
|
||||||
use tiny_keccak::Hasher;
|
use tiny_keccak::Hasher;
|
||||||
|
|
||||||
// For file-based caching, this determined the node where the cache file is located.
|
// For file-based caching, this determined the node where the cache file is located.
|
||||||
// No longer needed for scylla-based caching.
|
// No longer needed for scylla-based caching.
|
||||||
pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, cluster: &Cluster) -> u32 {
|
pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoordEnum, channel: &Channel, cluster: &Cluster) -> u32 {
|
||||||
let mut hash = tiny_keccak::Sha3::v256();
|
let mut hash = tiny_keccak::Sha3::v256();
|
||||||
hash.update(channel.backend.as_bytes());
|
hash.update(channel.backend.as_bytes());
|
||||||
hash.update(channel.name.as_bytes());
|
hash.update(channel.name.as_bytes());
|
||||||
hash.update(&patch_coord.patch_beg().to_le_bytes());
|
/*hash.update(&patch_coord.patch_beg().to_le_bytes());
|
||||||
hash.update(&patch_coord.patch_end().to_le_bytes());
|
hash.update(&patch_coord.patch_end().to_le_bytes());
|
||||||
hash.update(&patch_coord.bin_t_len().to_le_bytes());
|
hash.update(&patch_coord.bin_t_len().to_le_bytes());
|
||||||
hash.update(&patch_coord.patch_t_len().to_le_bytes());
|
hash.update(&patch_coord.patch_t_len().to_le_bytes());*/
|
||||||
let mut out = [0; 32];
|
let mut out = [0; 32];
|
||||||
hash.finalize(&mut out);
|
hash.finalize(&mut out);
|
||||||
let a = [out[0], out[1], out[2], out[3]];
|
let a = [out[0], out[1], out[2], out[3]];
|
||||||
@@ -31,12 +37,12 @@ pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, c
|
|||||||
pub struct CacheFileDesc {
|
pub struct CacheFileDesc {
|
||||||
// What identifies a cached file?
|
// What identifies a cached file?
|
||||||
channel: Channel,
|
channel: Channel,
|
||||||
patch: PreBinnedPatchCoord,
|
patch: PreBinnedPatchCoordEnum,
|
||||||
agg_kind: AggKind,
|
agg_kind: AggKind,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CacheFileDesc {
|
impl CacheFileDesc {
|
||||||
pub fn new(channel: Channel, patch: PreBinnedPatchCoord, agg_kind: AggKind) -> Self {
|
pub fn new(channel: Channel, patch: PreBinnedPatchCoordEnum, agg_kind: AggKind) -> Self {
|
||||||
Self {
|
Self {
|
||||||
channel,
|
channel,
|
||||||
patch,
|
patch,
|
||||||
@@ -50,9 +56,9 @@ impl CacheFileDesc {
|
|||||||
h.update(self.channel.backend.as_bytes());
|
h.update(self.channel.backend.as_bytes());
|
||||||
h.update(self.channel.name.as_bytes());
|
h.update(self.channel.name.as_bytes());
|
||||||
h.update(format!("{}", self.agg_kind).as_bytes());
|
h.update(format!("{}", self.agg_kind).as_bytes());
|
||||||
h.update(&self.patch.spec().bin_t_len().to_le_bytes());
|
//h.update(&self.patch.spec().bin_t_len().to_le_bytes());
|
||||||
h.update(&self.patch.spec().patch_t_len().to_le_bytes());
|
//h.update(&self.patch.spec().patch_t_len().to_le_bytes());
|
||||||
h.update(&self.patch.ix().to_le_bytes());
|
//h.update(&self.patch.ix().to_le_bytes());
|
||||||
let mut buf = [0; 32];
|
let mut buf = [0; 32];
|
||||||
h.finalize(&mut buf);
|
h.finalize(&mut buf);
|
||||||
hex::encode(&buf)
|
hex::encode(&buf)
|
||||||
@@ -79,12 +85,12 @@ impl CacheFileDesc {
|
|||||||
.join(&hc[3..6])
|
.join(&hc[3..6])
|
||||||
.join(&self.channel.name)
|
.join(&self.channel.name)
|
||||||
.join(format!("{}", self.agg_kind))
|
.join(format!("{}", self.agg_kind))
|
||||||
.join(format!(
|
/*.join(format!(
|
||||||
"{:010}-{:010}",
|
"{:010}-{:010}",
|
||||||
self.patch.spec().bin_t_len() / SEC,
|
self.patch.spec().bin_t_len() / SEC,
|
||||||
self.patch.spec().patch_t_len() / SEC
|
self.patch.spec().patch_t_len() / SEC
|
||||||
))
|
))
|
||||||
.join(format!("{}-{:012}", &hash[0..6], self.patch.ix()))
|
.join(format!("{}-{:012}", &hash[0..6], self.patch.ix()))*/
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -96,7 +102,7 @@ pub struct WrittenPbCache {
|
|||||||
// TODO only used for old archiver
|
// TODO only used for old archiver
|
||||||
pub async fn write_pb_cache_min_max_avg_scalar<T>(
|
pub async fn write_pb_cache_min_max_avg_scalar<T>(
|
||||||
values: T,
|
values: T,
|
||||||
patch: PreBinnedPatchCoord,
|
patch: PreBinnedPatchCoordEnum,
|
||||||
agg_kind: AggKind,
|
agg_kind: AggKind,
|
||||||
channel: Channel,
|
channel: Channel,
|
||||||
node_config: NodeConfigCached,
|
node_config: NodeConfigCached,
|
||||||
|
|||||||
+8
-3
@@ -309,7 +309,10 @@ impl EventsDynStream {
|
|||||||
let st = &scalar_type;
|
let st = &scalar_type;
|
||||||
let sh = &shape;
|
let sh = &shape;
|
||||||
let ag = &agg_kind;
|
let ag = &agg_kind;
|
||||||
let events_out = items_2::empty_events_dyn_ev(st, sh, ag)?;
|
// TODO do we need/want the empty item from here?
|
||||||
|
error!("TODO feed through transform?");
|
||||||
|
err::todo();
|
||||||
|
let events_out = items_2::empty::empty_events_dyn_ev(st, sh)?;
|
||||||
let scalar_conv = make_scalar_conv(st, sh, ag)?;
|
let scalar_conv = make_scalar_conv(st, sh, ag)?;
|
||||||
let emit_threshold = match &shape {
|
let emit_threshold = match &shape {
|
||||||
Shape::Scalar => 2048,
|
Shape::Scalar => 2048,
|
||||||
@@ -333,8 +336,10 @@ impl EventsDynStream {
|
|||||||
fn replace_events_out(&mut self) -> Result<Box<dyn Events>, Error> {
|
fn replace_events_out(&mut self) -> Result<Box<dyn Events>, Error> {
|
||||||
let st = &self.scalar_type;
|
let st = &self.scalar_type;
|
||||||
let sh = &self.shape;
|
let sh = &self.shape;
|
||||||
let ag = &self.agg_kind;
|
// TODO do we need/want the empty item from here?
|
||||||
let empty = items_2::empty_events_dyn_ev(st, sh, ag)?;
|
error!("TODO feed through transform?");
|
||||||
|
err::todo();
|
||||||
|
let empty = items_2::empty::empty_events_dyn_ev(st, sh)?;
|
||||||
let evs = mem::replace(&mut self.events_out, empty);
|
let evs = mem::replace(&mut self.events_out, empty);
|
||||||
Ok(evs)
|
Ok(evs)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -76,7 +76,7 @@ pub async fn make_event_pipe(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let entry_res = match extract_matching_config_entry(&range, &channel_config) {
|
let entry_res = match extract_matching_config_entry(&(&range).try_into()?, &channel_config) {
|
||||||
Ok(k) => k,
|
Ok(k) => k,
|
||||||
Err(e) => return Err(e)?,
|
Err(e) => return Err(e)?,
|
||||||
};
|
};
|
||||||
@@ -105,7 +105,7 @@ pub async fn make_event_pipe(
|
|||||||
);
|
);
|
||||||
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
|
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
|
||||||
let event_blobs = EventChunkerMultifile::new(
|
let event_blobs = EventChunkerMultifile::new(
|
||||||
range.clone(),
|
(&range).try_into()?,
|
||||||
channel_config.clone(),
|
channel_config.clone(),
|
||||||
node_config.node.clone(),
|
node_config.node.clone(),
|
||||||
node_config.ix,
|
node_config.ix,
|
||||||
@@ -115,10 +115,11 @@ pub async fn make_event_pipe(
|
|||||||
true,
|
true,
|
||||||
);
|
);
|
||||||
let shape = entry.to_shape()?;
|
let shape = entry.to_shape()?;
|
||||||
|
error!("TODO replace AggKind in the called code");
|
||||||
let pipe = make_num_pipeline_stream_evs(
|
let pipe = make_num_pipeline_stream_evs(
|
||||||
entry.scalar_type.clone(),
|
entry.scalar_type.clone(),
|
||||||
shape.clone(),
|
shape.clone(),
|
||||||
evq.agg_kind_value(),
|
AggKind::TimeWeightedScalar,
|
||||||
event_blobs,
|
event_blobs,
|
||||||
);
|
);
|
||||||
Ok(pipe)
|
Ok(pipe)
|
||||||
@@ -234,13 +235,13 @@ pub async fn make_event_blobs_pipe(
|
|||||||
}
|
}
|
||||||
let expand = evq.one_before_range();
|
let expand = evq.one_before_range();
|
||||||
let range = evq.range();
|
let range = evq.range();
|
||||||
let entry = get_applicable_entry(evq.range(), evq.channel().clone(), node_config).await?;
|
let entry = get_applicable_entry(&evq.range().try_into()?, evq.channel().clone(), node_config).await?;
|
||||||
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
|
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
|
||||||
type ItemType = Sitemty<EventFull>;
|
type ItemType = Sitemty<EventFull>;
|
||||||
// TODO should depend on host config
|
// TODO should depend on host config
|
||||||
let pipe = if true {
|
let pipe = if true {
|
||||||
let event_blobs = make_remote_event_blobs_stream(
|
let event_blobs = make_remote_event_blobs_stream(
|
||||||
range.clone(),
|
range.try_into()?,
|
||||||
evq.channel().clone(),
|
evq.channel().clone(),
|
||||||
&entry,
|
&entry,
|
||||||
expand,
|
expand,
|
||||||
@@ -257,7 +258,7 @@ pub async fn make_event_blobs_pipe(
|
|||||||
Box::pin(event_blobs) as _
|
Box::pin(event_blobs) as _
|
||||||
} else {
|
} else {
|
||||||
let event_blobs = make_local_event_blobs_stream(
|
let event_blobs = make_local_event_blobs_stream(
|
||||||
range.clone(),
|
range.try_into()?,
|
||||||
evq.channel().clone(),
|
evq.channel().clone(),
|
||||||
&entry,
|
&entry,
|
||||||
expand,
|
expand,
|
||||||
|
|||||||
@@ -813,6 +813,17 @@ impl SeriesRange {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl TryFrom<&SeriesRange> for NanoRange {
|
||||||
|
type Error = Error;
|
||||||
|
|
||||||
|
fn try_from(val: &SeriesRange) -> Result<NanoRange, Self::Error> {
|
||||||
|
match val {
|
||||||
|
SeriesRange::TimeRange(x) => Ok(x.clone()),
|
||||||
|
SeriesRange::PulseRange(_) => Err(Error::with_msg_no_trace("not a Time range")),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<NanoRange> for SeriesRange {
|
impl From<NanoRange> for SeriesRange {
|
||||||
fn from(k: NanoRange) -> Self {
|
fn from(k: NanoRange) -> Self {
|
||||||
Self::TimeRange(k)
|
Self::TimeRange(k)
|
||||||
|
|||||||
+3
-2
@@ -99,7 +99,7 @@ async fn make_channel_events_stream(
|
|||||||
// TODO why both in PlainEventsQuery and as separate parameter? Check other usages.
|
// TODO why both in PlainEventsQuery and as separate parameter? Check other usages.
|
||||||
let do_one_before_range = false;
|
let do_one_before_range = false;
|
||||||
// TODO use better builder pattern with shortcuts for production and dev defaults
|
// TODO use better builder pattern with shortcuts for production and dev defaults
|
||||||
let f = crate::channelconfig::channel_config(evq.range().clone(), evq.channel().clone(), node_config)
|
let f = crate::channelconfig::channel_config(evq.range().try_into()?, evq.channel().clone(), node_config)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
|
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
|
||||||
let scyco = conf;
|
let scyco = conf;
|
||||||
@@ -108,7 +108,8 @@ async fn make_channel_events_stream(
|
|||||||
let scalar_type = f.scalar_type;
|
let scalar_type = f.scalar_type;
|
||||||
let shape = f.shape;
|
let shape = f.shape;
|
||||||
let do_test_stream_error = false;
|
let do_test_stream_error = false;
|
||||||
let with_values = if let AggKind::PulseIdDiff = evq.agg_kind_value() {
|
error!("TODO derive AggKind from Transformed empty [846397]");
|
||||||
|
let with_values = if let AggKind::PulseIdDiff = AggKind::TimeWeightedScalar {
|
||||||
false
|
false
|
||||||
} else {
|
} else {
|
||||||
true
|
true
|
||||||
|
|||||||
Reference in New Issue
Block a user