From 7e8c6bd6765c48150496128ea698a1526c7be862 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sun, 24 Nov 2024 22:33:26 +0100 Subject: [PATCH] WIP new container --- crates/disk/src/decode.rs | 5 +++-- crates/scyllaconn/src/bincache.rs | 4 ++-- crates/scyllaconn/src/worker.rs | 8 ++++---- crates/streamio/src/tcprawclient.rs | 1 - 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/crates/disk/src/decode.rs b/crates/disk/src/decode.rs index 8926166..ebaa208 100644 --- a/crates/disk/src/decode.rs +++ b/crates/disk/src/decode.rs @@ -8,6 +8,7 @@ use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::Events; use items_0::WithLen; +use items_2::empty::empty_events_dyn_ev; use items_2::eventfull::EventFull; use items_2::eventsdim0::EventsDim0; use items_2::eventsdim1::EventsDim1; @@ -325,7 +326,7 @@ impl EventsDynStream { let sh = &shape; warn!("TODO EventsDynStream::new feed through transform"); // TODO do we need/want the empty item from here? - let events_out = items_2::empty::empty_events_dyn_ev(st, sh)?; + let events_out = empty_events_dyn_ev(st, sh)?; let scalar_conv = make_scalar_conv(st, sh, &agg_kind)?; let emit_threshold = match &shape { Shape::Scalar => 2048, @@ -350,7 +351,7 @@ impl EventsDynStream { let sh = &self.shape; // error!("TODO replace_events_out feed through transform"); // TODO do we need/want the empty item from here? - let empty = items_2::empty::empty_events_dyn_ev(st, sh)?; + let empty = empty_events_dyn_ev(st, sh)?; let evs = mem::replace(&mut self.events_out, empty); Ok(evs) } diff --git a/crates/scyllaconn/src/bincache.rs b/crates/scyllaconn/src/bincache.rs index c5d7b12..67cc150 100644 --- a/crates/scyllaconn/src/bincache.rs +++ b/crates/scyllaconn/src/bincache.rs @@ -42,7 +42,7 @@ impl streams::timebin::CacheReadProvider for ScyllaCacheReadProvider { pub async fn worker_write( series: u64, - bins: ContainerBins, + bins: ContainerBins, stmts_cache: &StmtsCache, scy: &ScySession, ) -> Result<(), streams::timebin::cached::reader::Error> { @@ -77,7 +77,7 @@ pub async fn worker_read( offs: core::ops::Range, stmts_cache: &StmtsCache, scy: &ScySession, -) -> Result, streams::timebin::cached::reader::Error> { +) -> Result, streams::timebin::cached::reader::Error> { let div = streams::timebin::cached::reader::part_len(bin_len).ns(); let params = ( series as i64, diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index 70eda92..0a05732 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -45,7 +45,7 @@ struct ReadCacheF32 { bin_len: DtMs, msp: u64, offs: core::ops::Range, - tx: Sender, streams::timebin::cached::reader::Error>>, + tx: Sender, streams::timebin::cached::reader::Error>>, } #[derive(Debug)] @@ -66,7 +66,7 @@ enum Job { ), WriteCacheF32( u64, - ContainerBins, + ContainerBins, Sender>, ), ReadCacheF32(ReadCacheF32), @@ -151,7 +151,7 @@ impl ScyllaQueue { pub async fn write_cache_f32( &self, series: u64, - bins: ContainerBins, + bins: ContainerBins, ) -> Result<(), streams::timebin::cached::reader::Error> { let (tx, rx) = async_channel::bounded(1); let job = Job::WriteCacheF32(series, bins, tx); @@ -172,7 +172,7 @@ impl ScyllaQueue { bin_len: DtMs, msp: u64, offs: core::ops::Range, - ) -> Result, streams::timebin::cached::reader::Error> { + ) -> Result, streams::timebin::cached::reader::Error> { let (tx, rx) = async_channel::bounded(1); let job = Job::ReadCacheF32(ReadCacheF32 { series, diff --git a/crates/streamio/src/tcprawclient.rs b/crates/streamio/src/tcprawclient.rs index 03b78d2..8452ba2 100644 --- a/crates/streamio/src/tcprawclient.rs +++ b/crates/streamio/src/tcprawclient.rs @@ -61,7 +61,6 @@ pub async fn x_processed_event_blobs_stream_from_node_tcp( Ok(Box::pin(items)) } -#[allow(unused)] async fn open_event_data_streams_tcp(subq: EventsSubQuery, cluster: &Cluster) -> Result>, Error> where // TODO group bounds in new trait