WIP new container

This commit is contained in:
Dominik Werder
2024-11-24 22:33:26 +01:00
parent dc961e2e13
commit 7e8c6bd676
4 changed files with 9 additions and 9 deletions

View File

@@ -8,6 +8,7 @@ use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::Events;
use items_0::WithLen;
use items_2::empty::empty_events_dyn_ev;
use items_2::eventfull::EventFull;
use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim1::EventsDim1;
@@ -325,7 +326,7 @@ impl EventsDynStream {
let sh = &shape;
warn!("TODO EventsDynStream::new feed through transform");
// TODO do we need/want the empty item from here?
let events_out = items_2::empty::empty_events_dyn_ev(st, sh)?;
let events_out = empty_events_dyn_ev(st, sh)?;
let scalar_conv = make_scalar_conv(st, sh, &agg_kind)?;
let emit_threshold = match &shape {
Shape::Scalar => 2048,
@@ -350,7 +351,7 @@ impl EventsDynStream {
let sh = &self.shape;
// error!("TODO replace_events_out feed through transform");
// TODO do we need/want the empty item from here?
let empty = items_2::empty::empty_events_dyn_ev(st, sh)?;
let empty = empty_events_dyn_ev(st, sh)?;
let evs = mem::replace(&mut self.events_out, empty);
Ok(evs)
}

View File

@@ -42,7 +42,7 @@ impl streams::timebin::CacheReadProvider for ScyllaCacheReadProvider {
pub async fn worker_write(
series: u64,
bins: ContainerBins<f32>,
bins: ContainerBins<f32, f32>,
stmts_cache: &StmtsCache,
scy: &ScySession,
) -> Result<(), streams::timebin::cached::reader::Error> {
@@ -77,7 +77,7 @@ pub async fn worker_read(
offs: core::ops::Range<u32>,
stmts_cache: &StmtsCache,
scy: &ScySession,
) -> Result<ContainerBins<f32>, streams::timebin::cached::reader::Error> {
) -> Result<ContainerBins<f32, f32>, streams::timebin::cached::reader::Error> {
let div = streams::timebin::cached::reader::part_len(bin_len).ns();
let params = (
series as i64,

View File

@@ -45,7 +45,7 @@ struct ReadCacheF32 {
bin_len: DtMs,
msp: u64,
offs: core::ops::Range<u32>,
tx: Sender<Result<ContainerBins<f32>, streams::timebin::cached::reader::Error>>,
tx: Sender<Result<ContainerBins<f32, f32>, streams::timebin::cached::reader::Error>>,
}
#[derive(Debug)]
@@ -66,7 +66,7 @@ enum Job {
),
WriteCacheF32(
u64,
ContainerBins<f32>,
ContainerBins<f32, f32>,
Sender<Result<(), streams::timebin::cached::reader::Error>>,
),
ReadCacheF32(ReadCacheF32),
@@ -151,7 +151,7 @@ impl ScyllaQueue {
pub async fn write_cache_f32(
&self,
series: u64,
bins: ContainerBins<f32>,
bins: ContainerBins<f32, f32>,
) -> Result<(), streams::timebin::cached::reader::Error> {
let (tx, rx) = async_channel::bounded(1);
let job = Job::WriteCacheF32(series, bins, tx);
@@ -172,7 +172,7 @@ impl ScyllaQueue {
bin_len: DtMs,
msp: u64,
offs: core::ops::Range<u32>,
) -> Result<ContainerBins<f32>, streams::timebin::cached::reader::Error> {
) -> Result<ContainerBins<f32, f32>, streams::timebin::cached::reader::Error> {
let (tx, rx) = async_channel::bounded(1);
let job = Job::ReadCacheF32(ReadCacheF32 {
series,

View File

@@ -61,7 +61,6 @@ pub async fn x_processed_event_blobs_stream_from_node_tcp(
Ok(Box::pin(items))
}
#[allow(unused)]
async fn open_event_data_streams_tcp<T>(subq: EventsSubQuery, cluster: &Cluster) -> Result<Vec<BoxedStream<T>>, Error>
where
// TODO group bounds in new trait