From b0f71b2143e3012c612b39e9154b58a385d4e598 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 9 Mar 2023 16:20:28 +0100 Subject: [PATCH] WIP streams crate checks --- disk/src/cache.rs | 42 ++++++++++++++++++++++++------------------ disk/src/decode.rs | 11 ++++++++--- disk/src/raw/conn.rs | 13 +++++++------ netpod/src/netpod.rs | 11 +++++++++++ nodenet/src/conn.rs | 5 +++-- 5 files changed, 53 insertions(+), 29 deletions(-) diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 29304ef..2c652c475 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -2,24 +2,30 @@ use chrono::Utc; use err::Error; use netpod::log::*; use netpod::timeunits::SEC; -use netpod::{AggKind, Channel, Cluster, NodeConfigCached, PreBinnedPatchCoord}; -use serde::{Deserialize, Serialize}; +use netpod::AggKind; +use netpod::Channel; +use netpod::Cluster; +use netpod::NodeConfigCached; +use netpod::PreBinnedPatchCoordEnum; +use serde::Deserialize; +use serde::Serialize; use std::collections::VecDeque; use std::io; use std::path::PathBuf; -use std::time::{Duration, Instant}; +use std::time::Duration; +use std::time::Instant; use tiny_keccak::Hasher; // For file-based caching, this determined the node where the cache file is located. // No longer needed for scylla-based caching. -pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, cluster: &Cluster) -> u32 { +pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoordEnum, channel: &Channel, cluster: &Cluster) -> u32 { let mut hash = tiny_keccak::Sha3::v256(); hash.update(channel.backend.as_bytes()); hash.update(channel.name.as_bytes()); - hash.update(&patch_coord.patch_beg().to_le_bytes()); + /*hash.update(&patch_coord.patch_beg().to_le_bytes()); hash.update(&patch_coord.patch_end().to_le_bytes()); hash.update(&patch_coord.bin_t_len().to_le_bytes()); - hash.update(&patch_coord.patch_t_len().to_le_bytes()); + hash.update(&patch_coord.patch_t_len().to_le_bytes());*/ let mut out = [0; 32]; hash.finalize(&mut out); let a = [out[0], out[1], out[2], out[3]]; @@ -31,12 +37,12 @@ pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, c pub struct CacheFileDesc { // What identifies a cached file? channel: Channel, - patch: PreBinnedPatchCoord, + patch: PreBinnedPatchCoordEnum, agg_kind: AggKind, } impl CacheFileDesc { - pub fn new(channel: Channel, patch: PreBinnedPatchCoord, agg_kind: AggKind) -> Self { + pub fn new(channel: Channel, patch: PreBinnedPatchCoordEnum, agg_kind: AggKind) -> Self { Self { channel, patch, @@ -50,9 +56,9 @@ impl CacheFileDesc { h.update(self.channel.backend.as_bytes()); h.update(self.channel.name.as_bytes()); h.update(format!("{}", self.agg_kind).as_bytes()); - h.update(&self.patch.spec().bin_t_len().to_le_bytes()); - h.update(&self.patch.spec().patch_t_len().to_le_bytes()); - h.update(&self.patch.ix().to_le_bytes()); + //h.update(&self.patch.spec().bin_t_len().to_le_bytes()); + //h.update(&self.patch.spec().patch_t_len().to_le_bytes()); + //h.update(&self.patch.ix().to_le_bytes()); let mut buf = [0; 32]; h.finalize(&mut buf); hex::encode(&buf) @@ -79,12 +85,12 @@ impl CacheFileDesc { .join(&hc[3..6]) .join(&self.channel.name) .join(format!("{}", self.agg_kind)) - .join(format!( - "{:010}-{:010}", - self.patch.spec().bin_t_len() / SEC, - self.patch.spec().patch_t_len() / SEC - )) - .join(format!("{}-{:012}", &hash[0..6], self.patch.ix())) + /*.join(format!( + "{:010}-{:010}", + self.patch.spec().bin_t_len() / SEC, + self.patch.spec().patch_t_len() / SEC + )) + .join(format!("{}-{:012}", &hash[0..6], self.patch.ix()))*/ } } @@ -96,7 +102,7 @@ pub struct WrittenPbCache { // TODO only used for old archiver pub async fn write_pb_cache_min_max_avg_scalar( values: T, - patch: PreBinnedPatchCoord, + patch: PreBinnedPatchCoordEnum, agg_kind: AggKind, channel: Channel, node_config: NodeConfigCached, diff --git a/disk/src/decode.rs b/disk/src/decode.rs index 5fb7c6f..d40c190 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -309,7 +309,10 @@ impl EventsDynStream { let st = &scalar_type; let sh = &shape; let ag = &agg_kind; - let events_out = items_2::empty_events_dyn_ev(st, sh, ag)?; + // TODO do we need/want the empty item from here? + error!("TODO feed through transform?"); + err::todo(); + let events_out = items_2::empty::empty_events_dyn_ev(st, sh)?; let scalar_conv = make_scalar_conv(st, sh, ag)?; let emit_threshold = match &shape { Shape::Scalar => 2048, @@ -333,8 +336,10 @@ impl EventsDynStream { fn replace_events_out(&mut self) -> Result, Error> { let st = &self.scalar_type; let sh = &self.shape; - let ag = &self.agg_kind; - let empty = items_2::empty_events_dyn_ev(st, sh, ag)?; + // TODO do we need/want the empty item from here? + error!("TODO feed through transform?"); + err::todo(); + let empty = items_2::empty::empty_events_dyn_ev(st, sh)?; let evs = mem::replace(&mut self.events_out, empty); Ok(evs) } diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index df4d8a8..ffdcc08 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -76,7 +76,7 @@ pub async fn make_event_pipe( } } }; - let entry_res = match extract_matching_config_entry(&range, &channel_config) { + let entry_res = match extract_matching_config_entry(&(&range).try_into()?, &channel_config) { Ok(k) => k, Err(e) => return Err(e)?, }; @@ -105,7 +105,7 @@ pub async fn make_event_pipe( ); let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); let event_blobs = EventChunkerMultifile::new( - range.clone(), + (&range).try_into()?, channel_config.clone(), node_config.node.clone(), node_config.ix, @@ -115,10 +115,11 @@ pub async fn make_event_pipe( true, ); let shape = entry.to_shape()?; + error!("TODO replace AggKind in the called code"); let pipe = make_num_pipeline_stream_evs( entry.scalar_type.clone(), shape.clone(), - evq.agg_kind_value(), + AggKind::TimeWeightedScalar, event_blobs, ); Ok(pipe) @@ -234,13 +235,13 @@ pub async fn make_event_blobs_pipe( } let expand = evq.one_before_range(); let range = evq.range(); - let entry = get_applicable_entry(evq.range(), evq.channel().clone(), node_config).await?; + let entry = get_applicable_entry(&evq.range().try_into()?, evq.channel().clone(), node_config).await?; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); type ItemType = Sitemty; // TODO should depend on host config let pipe = if true { let event_blobs = make_remote_event_blobs_stream( - range.clone(), + range.try_into()?, evq.channel().clone(), &entry, expand, @@ -257,7 +258,7 @@ pub async fn make_event_blobs_pipe( Box::pin(event_blobs) as _ } else { let event_blobs = make_local_event_blobs_stream( - range.clone(), + range.try_into()?, evq.channel().clone(), &entry, expand, diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 7a302a8..0f0647a 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -813,6 +813,17 @@ impl SeriesRange { } } +impl TryFrom<&SeriesRange> for NanoRange { + type Error = Error; + + fn try_from(val: &SeriesRange) -> Result { + match val { + SeriesRange::TimeRange(x) => Ok(x.clone()), + SeriesRange::PulseRange(_) => Err(Error::with_msg_no_trace("not a Time range")), + } + } +} + impl From for SeriesRange { fn from(k: NanoRange) -> Self { Self::TimeRange(k) diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 487cb0a..acfce6b 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -99,7 +99,7 @@ async fn make_channel_events_stream( // TODO why both in PlainEventsQuery and as separate parameter? Check other usages. let do_one_before_range = false; // TODO use better builder pattern with shortcuts for production and dev defaults - let f = crate::channelconfig::channel_config(evq.range().clone(), evq.channel().clone(), node_config) + let f = crate::channelconfig::channel_config(evq.range().try_into()?, evq.channel().clone(), node_config) .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let scyco = conf; @@ -108,7 +108,8 @@ async fn make_channel_events_stream( let scalar_type = f.scalar_type; let shape = f.shape; let do_test_stream_error = false; - let with_values = if let AggKind::PulseIdDiff = evq.agg_kind_value() { + error!("TODO derive AggKind from Transformed empty [846397]"); + let with_values = if let AggKind::PulseIdDiff = AggKind::TimeWeightedScalar { false } else { true