Factor out query type

This commit is contained in:
Dominik Werder
2023-03-20 07:13:59 +01:00
parent 1baa78bd3a
commit c0bdc854ff
37 changed files with 595 additions and 500 deletions

View File

@@ -310,9 +310,8 @@ impl EventsDynStream {
let st = &scalar_type;
let sh = &shape;
let ag = &agg_kind;
// TODO do we need/want the empty item from here?
error!("TODO feed through transform?");
err::todo();
// TODO do we need/want the empty item from here?
let events_out = items_2::empty::empty_events_dyn_ev(st, sh)?;
let scalar_conv = make_scalar_conv(st, sh, ag)?;
let emit_threshold = match &shape {
@@ -337,9 +336,8 @@ impl EventsDynStream {
fn replace_events_out(&mut self) -> Result<Box<dyn Events>, Error> {
let st = &self.scalar_type;
let sh = &self.shape;
// TODO do we need/want the empty item from here?
error!("TODO feed through transform?");
err::todo();
// TODO do we need/want the empty item from here?
let empty = items_2::empty::empty_events_dyn_ev(st, sh)?;
let evs = mem::replace(&mut self.events_out, empty);
Ok(evs)

View File

@@ -6,9 +6,9 @@ use items_0::streamitem::Sitemty;
use items_2::eventfull::EventFull;
use items_2::merger::Merger;
use netpod::log::*;
use netpod::query::PlainEventsQuery;
use netpod::Cluster;
use netpod::PerfOpts;
use query::api4::events::PlainEventsQuery;
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
@@ -99,7 +99,7 @@ impl Stream for MergedBlobsFromRemotes {
if c1 == self.tcp_establish_futs.len() {
let inps = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect();
// TODO set out_max_len dynamically
let s1 = Merger::new(inps, 128);
let s1 = Merger::new(inps, 1);
self.merged = Some(Box::pin(s1));
}
continue 'outer;

View File

@@ -9,7 +9,6 @@ use items_0::streamitem::StreamItem;
use items_2::channelevents::ChannelEvents;
use items_2::eventfull::EventFull;
use netpod::log::*;
use netpod::query::PlainEventsQuery;
use netpod::range::evrange::NanoRange;
use netpod::AggKind;
use netpod::ByteSize;
@@ -22,6 +21,7 @@ use parse::channelconfig::extract_matching_config_entry;
use parse::channelconfig::read_local_config;
use parse::channelconfig::ConfigEntry;
use parse::channelconfig::MatchingConfigEntry;
use query::api4::events::PlainEventsQuery;
use std::pin::Pin;
use streams::eventchunker::EventChunkerConf;
@@ -109,7 +109,7 @@ pub async fn make_event_pipe(
let out_max_len = if node_config.node_config.cluster.is_central_storage {
1
} else {
128
1
};
let event_blobs = EventChunkerMultifile::new(
(&range).try_into()?,
@@ -184,7 +184,7 @@ pub fn make_local_event_blobs_stream(
let out_max_len = if node_config.node_config.cluster.is_central_storage {
1
} else {
128
1
};
let event_blobs = EventChunkerMultifile::new(
range,
@@ -230,7 +230,7 @@ pub fn make_remote_event_blobs_stream(
let out_max_len = if node_config.node_config.cluster.is_central_storage {
1
} else {
128
1
};
let event_blobs = EventChunkerMultifile::new(
range,
@@ -263,13 +263,14 @@ pub async fn make_event_blobs_pipe(
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
type ItemType = Sitemty<EventFull>;
// TODO should depend on host config
let pipe = if true {
let event_blobs = make_remote_event_blobs_stream(
let do_local = node_config.node_config.cluster.is_central_storage;
let pipe = if do_local {
let event_blobs = make_local_event_blobs_stream(
range.try_into()?,
evq.channel().clone(),
&entry,
expand,
true,
false,
event_chunker_conf,
DiskIoTune::default(),
node_config,
@@ -281,7 +282,7 @@ pub async fn make_event_blobs_pipe(
pipe*/
Box::pin(event_blobs) as _
} else {
let event_blobs = make_local_event_blobs_stream(
let event_blobs = make_remote_event_blobs_stream(
range.try_into()?,
evq.channel().clone(),
&entry,