diff --git a/daqbufp2/src/client.rs b/daqbufp2/src/client.rs index 3602805..81cf981 100644 --- a/daqbufp2/src/client.rs +++ b/daqbufp2/src/client.rs @@ -69,7 +69,7 @@ pub async fn get_binned( }; let agg_kind = AggKind::DimXBins1; let range = NanoRange::from_date_time(beg_date, end_date); - let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind); + let mut query = BinnedQuery::new(channel, range, bin_count, Some(agg_kind)); query.set_cache_usage(cache_usage); query.set_disk_stats_every(ByteSize(1024 * disk_stats_every_kb)); let hp = HostPort { host: host, port: port }; diff --git a/daqbufp2/src/test/api4/binnedjson.rs b/daqbufp2/src/test/api4/binnedjson.rs index 582d88f..46b1da4 100644 --- a/daqbufp2/src/test/api4/binnedjson.rs +++ b/daqbufp2/src/test/api4/binnedjson.rs @@ -270,7 +270,7 @@ async fn get_binned_json( let beg_date: DateTime = beg_date.parse()?; let end_date: DateTime = end_date.parse()?; let range = NanoRange::from_date_time(beg_date, end_date); - let query = BinnedQuery::new(channel, range, bin_count, AggKind::TimeWeightedScalar); + let query = BinnedQuery::new(channel, range, bin_count, Some(AggKind::TimeWeightedScalar)); let hp = HostPort::from_node(node0); let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?; query.append_to_url(&mut url); diff --git a/daqbufp2/src/test/api4/eventsjson.rs b/daqbufp2/src/test/api4/eventsjson.rs index 9588b99..f08a090 100644 --- a/daqbufp2/src/test/api4/eventsjson.rs +++ b/daqbufp2/src/test/api4/eventsjson.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use crate::err::ErrConv; use crate::nodes::require_test_hosts_running; use crate::test::f32_iter_cmp_near; @@ -105,14 +103,7 @@ async fn events_plain_json( let beg_date: DateTime = beg_date.parse()?; let end_date: DateTime = end_date.parse()?; let range = NanoRange::from_date_time(beg_date, end_date); - let query = PlainEventsQuery::new( - channel, - range, - AggKind::TimeWeightedScalar, - Duration::from_millis(10000), - None, - true, - ); + let query = PlainEventsQuery::new(channel, range, Some(AggKind::TimeWeightedScalar), None, None); let hp = HostPort::from_node(node0); let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; query.append_to_url(&mut url); diff --git a/daqbufp2/src/test/binnedbinary.rs b/daqbufp2/src/test/binnedbinary.rs index b4bf1e6..467264c 100644 --- a/daqbufp2/src/test/binnedbinary.rs +++ b/daqbufp2/src/test/binnedbinary.rs @@ -116,9 +116,9 @@ where series: None, }; let range = NanoRange::from_date_time(beg_date, end_date); - let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind); + let mut query = BinnedQuery::new(channel, range, bin_count, Some(agg_kind)); query.set_cache_usage(CacheUsage::Ignore); - query.set_disk_io_buffer_size(1024 * 16); + query.set_buf_len_disk_io(1024 * 16); let hp = HostPort::from_node(node0); let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?; query.append_to_url(&mut url); diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs index ac67cd4..d5a0b7f 100644 --- a/daqbufp2/src/test/events.rs +++ b/daqbufp2/src/test/events.rs @@ -24,7 +24,6 @@ use netpod::APP_OCTET; use serde_json::Value as JsonValue; use std::fmt::Debug; use std::future::ready; -use std::time::Duration; use streams::frames::inmem::InMemoryFrameAsyncReadStream; use tokio::io::AsyncRead; use url::Url; @@ -88,14 +87,7 @@ async fn get_plain_events_binary( series: None, }; let range = NanoRange::from_date_time(beg_date, end_date); - let query = PlainEventsQuery::new( - channel, - range, - AggKind::TimeWeightedScalar, - Duration::from_millis(10000), - None, - true, - ); + let query = PlainEventsQuery::new(channel, range, Some(AggKind::TimeWeightedScalar), None, None); let hp = HostPort::from_node(node0); let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; query.append_to_url(&mut url); @@ -268,14 +260,7 @@ pub async fn get_plain_events_json( let beg_date: DateTime = beg_date.parse()?; let end_date: DateTime = end_date.parse()?; let range = NanoRange::from_date_time(beg_date, end_date); - let query = PlainEventsQuery::new( - channel, - range, - AggKind::TimeWeightedScalar, - Duration::from_millis(10000), - None, - true, - ); + let query = PlainEventsQuery::new(channel, range, Some(AggKind::TimeWeightedScalar), None, None); let hp = HostPort::from_node(node0); let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; query.append_to_url(&mut url); diff --git a/daqbufp2/src/test/timeweightedjson.rs b/daqbufp2/src/test/timeweightedjson.rs index 8a7ed8a..7859d77 100644 --- a/daqbufp2/src/test/timeweightedjson.rs +++ b/daqbufp2/src/test/timeweightedjson.rs @@ -102,7 +102,7 @@ async fn get_json_common( series: None, }; let range = NanoRange::from_date_time(beg_date, end_date); - let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind); + let mut query = BinnedQuery::new(channel, range, bin_count, Some(agg_kind)); query.set_timeout(Duration::from_millis(40000)); query.set_cache_usage(CacheUsage::Ignore); let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?; diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index eb11557..b532649 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -93,7 +93,7 @@ pub async fn make_event_pipe( channel: evq.channel().clone(), keyspace: entry.ks as u8, time_bin_size: entry.bs, - shape: shape, + shape, scalar_type: entry.scalar_type.clone(), byte_order: entry.byte_order.clone(), array: entry.is_array, @@ -101,7 +101,7 @@ pub async fn make_event_pipe( }; trace!( "make_event_pipe need_expand {need_expand} {evq:?}", - need_expand = evq.agg_kind().need_expand() + need_expand = evq.one_before_range() ); let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); let event_blobs = EventChunkerMultifile::new( @@ -111,14 +111,14 @@ pub async fn make_event_pipe( node_config.ix, DiskIoTune::default(), event_chunker_conf, - evq.agg_kind().need_expand(), + evq.one_before_range(), true, ); let shape = entry.to_shape()?; let pipe = make_num_pipeline_stream_evs( entry.scalar_type.clone(), shape.clone(), - evq.agg_kind().clone(), + evq.agg_kind_value(), event_blobs, ); Ok(pipe) @@ -232,7 +232,7 @@ pub async fn make_event_blobs_pipe( Err(e) => return Err(e)?, } } - let expand = evq.agg_kind().need_expand(); + let expand = evq.one_before_range(); let range = evq.range(); let entry = get_applicable_entry(evq.range(), evq.channel().clone(), node_config).await?; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 1025d93..6d7c5a9 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -706,7 +706,7 @@ impl DataApiPython3DataStream { } match &b.shapes[i1] { _ => { - let empty_blob=Vec::new(); + let empty_blob = Vec::new(); let blob = b.blobs[i1].as_ref().unwrap_or(&empty_blob); let l1 = 17 + blob.len() as u32; d.put_u32(l1); @@ -793,11 +793,11 @@ impl Stream for DataApiPython3DataStream { let evq = PlainEventsQuery::new( channel, self.range.clone(), - netpod::AggKind::EventBlobs, - Duration::from_millis(10000), + Some(netpod::AggKind::EventBlobs), + Some(Duration::from_millis(600000)), None, - true, ); + info!("query for event blobs retrieval: evq {evq:?}"); warn!("fix magic inmem_bufcap"); let perf_opts = PerfOpts::default(); // TODO is this a good to place decide this? @@ -809,7 +809,7 @@ impl Stream for DataApiPython3DataStream { evq.range().clone(), evq.channel().clone(), &entry, - evq.agg_kind().need_expand(), + evq.one_before_range(), self.do_decompress, event_chunker_conf, self.disk_io_tune.clone(), diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs index 8f42a64..5bdc346 100644 --- a/items_0/src/items_0.rs +++ b/items_0/src/items_0.rs @@ -154,6 +154,11 @@ pub trait Events: fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box; fn move_into_fresh(&mut self, ts_end: u64) -> Box; fn move_into_existing(&mut self, tgt: &mut Box, ts_end: u64) -> Result<(), ()>; + fn new_empty(&self) -> Box; + fn drain_into(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), ()>; + fn find_lowest_index_gt(&self, ts: u64) -> Option; + fn find_lowest_index_ge(&self, ts: u64) -> Option; + fn find_highest_index_lt(&self, ts: u64) -> Option; fn clone_dyn(&self) -> Box; fn partial_eq_dyn(&self, other: &dyn Events) -> bool; fn serde_id(&self) -> &'static str; diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index a719f13..d426578 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -504,28 +504,6 @@ impl crate::merger::Mergeable for ChannelEvents { } } - fn is_compatible_target(&self, tgt: &Self) -> bool { - use ChannelEvents::*; - match self { - Events(_) => { - // TODO better to delegate this to inner type? - if let Events(_) = tgt { - true - } else { - false - } - } - Status(_) => { - // TODO better to delegate this to inner type? - if let Status(_) = tgt { - true - } else { - false - } - } - } - } - fn move_into_fresh(&mut self, ts_end: u64) -> Self { match self { ChannelEvents::Events(k) => ChannelEvents::Events(k.move_into_fresh(ts_end)), @@ -545,6 +523,69 @@ impl crate::merger::Mergeable for ChannelEvents { }, } } + + fn new_empty(&self) -> Self { + match self { + ChannelEvents::Events(k) => ChannelEvents::Events(k.new_empty()), + ChannelEvents::Status(k) => ChannelEvents::Status(k.clone()), + } + } + + fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), merger::MergeError> { + match self { + ChannelEvents::Events(k) => match dst { + ChannelEvents::Events(j) => k.drain_into(j, range), + ChannelEvents::Status(_) => Err(merger::MergeError::NotCompatible), + }, + ChannelEvents::Status(k) => match dst { + ChannelEvents::Events(_) => Err(merger::MergeError::NotCompatible), + ChannelEvents::Status(j) => { + // TODO must have some empty-value for the status container. + *j = k.clone(); + Ok(()) + } + }, + } + } + + fn find_lowest_index_gt(&self, ts: u64) -> Option { + match self { + ChannelEvents::Events(k) => k.find_lowest_index_gt(ts), + ChannelEvents::Status(k) => { + if k.ts > ts { + Some(0) + } else { + None + } + } + } + } + + fn find_lowest_index_ge(&self, ts: u64) -> Option { + match self { + ChannelEvents::Events(k) => k.find_lowest_index_ge(ts), + ChannelEvents::Status(k) => { + if k.ts >= ts { + Some(0) + } else { + None + } + } + } + } + + fn find_highest_index_lt(&self, ts: u64) -> Option { + match self { + ChannelEvents::Events(k) => k.find_highest_index_lt(ts), + ChannelEvents::Status(k) => { + if k.ts < ts { + Some(0) + } else { + None + } + } + } + } } impl Collectable for ChannelEvents { diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 5da5fba..2274626 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -382,7 +382,6 @@ impl items_0::collect_s::CollectorType for EventsDim0Collector Events for EventsDim0 { } } + fn new_empty(&self) -> Box { + Box::new(Self::empty()) + } + + fn drain_into(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), ()> { + // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. + if let Some(dst) = dst.as_mut().as_any_mut().downcast_mut::() { + // TODO make it harder to forget new members when the struct may get modified in the future + let r = range.0..range.1; + dst.tss.extend(self.tss.drain(r.clone())); + dst.pulses.extend(self.pulses.drain(r.clone())); + dst.values.extend(self.values.drain(r.clone())); + Ok(()) + } else { + error!("downcast to EventsDim0 FAILED"); + Err(()) + } + } + + fn find_lowest_index_gt(&self, ts: u64) -> Option { + for (i, &m) in self.tss.iter().enumerate() { + if m > ts { + return Some(i); + } + } + None + } + + fn find_lowest_index_ge(&self, ts: u64) -> Option { + for (i, &m) in self.tss.iter().enumerate() { + if m >= ts { + return Some(i); + } + } + None + } + + fn find_highest_index_lt(&self, ts: u64) -> Option { + for (i, &m) in self.tss.iter().enumerate().rev() { + if m < ts { + return Some(i); + } + } + None + } + fn ts_min(&self) -> Option { self.tss.front().map(|&x| x) } diff --git a/items_2/src/eventsdim1.rs b/items_2/src/eventsdim1.rs index b485b48..21b8dd6 100644 --- a/items_2/src/eventsdim1.rs +++ b/items_2/src/eventsdim1.rs @@ -756,6 +756,52 @@ impl Events for EventsDim1 { } } + fn new_empty(&self) -> Box { + Box::new(Self::empty()) + } + + fn drain_into(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), ()> { + // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. + if let Some(dst) = dst.as_mut().as_any_mut().downcast_mut::() { + // TODO make it harder to forget new members when the struct may get modified in the future + let r = range.0..range.1; + dst.tss.extend(self.tss.drain(r.clone())); + dst.pulses.extend(self.pulses.drain(r.clone())); + dst.values.extend(self.values.drain(r.clone())); + Ok(()) + } else { + error!("downcast to EventsDim0 FAILED"); + Err(()) + } + } + + fn find_lowest_index_gt(&self, ts: u64) -> Option { + for (i, &m) in self.tss.iter().enumerate() { + if m > ts { + return Some(i); + } + } + None + } + + fn find_lowest_index_ge(&self, ts: u64) -> Option { + for (i, &m) in self.tss.iter().enumerate() { + if m >= ts { + return Some(i); + } + } + None + } + + fn find_highest_index_lt(&self, ts: u64) -> Option { + for (i, &m) in self.tss.iter().enumerate().rev() { + if m < ts { + return Some(i); + } + } + None + } + fn ts_min(&self) -> Option { self.tss.front().map(|&x| x) } diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index 982f8c4..6b3bbe5 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -51,7 +51,6 @@ pub fn bool_is_false(x: &bool) -> bool { pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, VecDeque, VecDeque) { let ts_anchor_sec = tss.first().map_or(0, |&k| k) / SEC; - info!("ts_offs_from_abs ts_anchor_sec {ts_anchor_sec}"); let ts_anchor_ns = ts_anchor_sec * SEC; let ts_off_ms: VecDeque<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect(); let ts_off_ns = tss @@ -74,12 +73,7 @@ pub fn ts_offs_from_abs_with_anchor(ts_anchor_sec: u64, tss: &[u64]) -> (VecDequ } pub fn pulse_offs_from_abs(pulse: &[u64]) -> (u64, VecDeque) { - error!("pulse_offs_from_abs {} DATA", pulse.len()); - for x in pulse { - error!("{x}"); - } let pulse_anchor = pulse.first().map_or(0, |&k| k) / 10000 * 10000; - info!("pulse_offs_from_abs pulse_anchor {pulse_anchor}"); let pulse_off = pulse.iter().map(|&k| k - pulse_anchor).collect(); (pulse_anchor, pulse_off) } @@ -188,11 +182,6 @@ impl crate::merger::Mergeable for Box { self.as_ref().ts_max() } - fn is_compatible_target(&self, _tgt: &Self) -> bool { - // TODO currently unused - todo!() - } - fn move_into_fresh(&mut self, ts_end: u64) -> Self { self.as_mut().move_into_fresh(ts_end) } @@ -202,6 +191,28 @@ impl crate::merger::Mergeable for Box { .move_into_existing(tgt, ts_end) .map_err(|()| merger::MergeError::NotCompatible) } + + fn new_empty(&self) -> Self { + self.as_ref().new_empty() + } + + fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), merger::MergeError> { + self.as_mut() + .drain_into(dst, range) + .map_err(|()| merger::MergeError::NotCompatible) + } + + fn find_lowest_index_gt(&self, ts: u64) -> Option { + self.as_ref().find_lowest_index_gt(ts) + } + + fn find_lowest_index_ge(&self, ts: u64) -> Option { + self.as_ref().find_lowest_index_ge(ts) + } + + fn find_highest_index_lt(&self, ts: u64) -> Option { + self.as_ref().find_highest_index_lt(ts) + } } // TODO rename to `Typed` diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs index 4031fe2..59376fb 100644 --- a/items_2/src/merger.rs +++ b/items_2/src/merger.rs @@ -5,7 +5,7 @@ use items::{RangeCompletableItem, Sitemty, StreamItem}; use netpod::log::*; use std::collections::VecDeque; use std::fmt; -use std::ops::ControlFlow; +use std::ops::{ControlFlow, RangeBounds}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -43,12 +43,17 @@ pub trait Mergeable: fmt::Debug + Unpin { fn len(&self) -> usize; fn ts_min(&self) -> Option; fn ts_max(&self) -> Option; - // TODO remove, useless. - fn is_compatible_target(&self, tgt: &Rhs) -> bool; - // TODO rename to `append_*` to make it clear that they simply append, but not re-sort. + // TODO remove, superseded. fn move_into_fresh(&mut self, ts_end: u64) -> Rhs; fn move_into_existing(&mut self, tgt: &mut Rhs, ts_end: u64) -> Result<(), MergeError>; + + // TODO: split the logic into: make fresh container, and a single drain_into method. Or is there any advantage in having both? + fn new_empty(&self) -> Self; + fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError>; + fn find_lowest_index_gt(&self, ts: u64) -> Option; + fn find_lowest_index_ge(&self, ts: u64) -> Option; + fn find_highest_index_lt(&self, ts: u64) -> Option; } type MergeInp = Pin> + Send>>; @@ -107,6 +112,19 @@ where } } + fn drain_into_upto(src: &mut T, dst: &mut T, upto: u64) -> Result<(), MergeError> { + match src.find_lowest_index_gt(upto) { + Some(ilgt) => { + src.drain_into(dst, (0, ilgt))?; + } + None => { + // TODO should not be here. + src.drain_into(dst, (0, src.len()))?; + } + } + Ok(()) + } + fn take_into_output_all(&mut self, src: &mut T) -> Result<(), MergeError> { // TODO optimize the case when some large batch should be added to some existing small batch already in out. // TODO maybe use two output slots? @@ -116,14 +134,15 @@ where fn take_into_output_upto(&mut self, src: &mut T, upto: u64) -> Result<(), MergeError> { // TODO optimize the case when some large batch should be added to some existing small batch already in out. // TODO maybe use two output slots? - if self.out.is_none() { - trace2!("move into fresh"); - self.out = Some(src.move_into_fresh(upto)); - Ok(()) + if let Some(out) = self.out.as_mut() { + Self::drain_into_upto(src, out, upto)?; } else { - let out = self.out.as_mut().unwrap(); - src.move_into_existing(out, upto) + trace2!("move into fresh"); + let mut fresh = src.new_empty(); + Self::drain_into_upto(src, &mut fresh, upto)?; + self.out = Some(fresh); } + Ok(()) } fn process(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result, Error> { @@ -159,7 +178,7 @@ where } } } - info!("tslows {tslows:?}"); + trace4!("tslows {tslows:?}"); if let Some((il0, _tl0)) = tslows[0] { if let Some((_il1, tl1)) = tslows[1] { // There is a second input, take only up to the second highest timestamp @@ -167,6 +186,7 @@ where if let Some(th0) = item.ts_max() { if th0 <= tl1 { // Can take the whole item + // TODO gather stats about this case. Should be never for databuffer, and often for scylla. let mut item = self.items[il0].take().unwrap(); trace3!("Take all from item {item:?}"); match self.take_into_output_all(&mut item) { diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index d9fcca6..e901f2b 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1747,17 +1747,17 @@ impl EventDataReadStats { #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct RangeFilterStats { - pub events_pre: u64, - pub events_post: u64, - pub events_unordered: u64, + pub items_no_prune_high: u64, + pub items_all_prune_high: u64, + pub items_part_prune_high: u64, } impl RangeFilterStats { pub fn new() -> Self { Self { - events_pre: 0, - events_post: 0, - events_unordered: 0, + items_no_prune_high: 0, + items_all_prune_high: 0, + items_part_prune_high: 0, } } } @@ -1827,7 +1827,7 @@ impl PerfOpts { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct ByteSize(pub u32); impl ByteSize { diff --git a/netpod/src/query.rs b/netpod/src/query.rs index 116be68..63d0191 100644 --- a/netpod/src/query.rs +++ b/netpod/src/query.rs @@ -25,7 +25,7 @@ use std::fmt; use std::time::Duration; use url::Url; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum CacheUsage { Use, Ignore, @@ -42,19 +42,22 @@ impl CacheUsage { .into() } - pub fn from_pairs(pairs: &BTreeMap) -> Result { - let ret = pairs.get("cacheUsage").map_or(Ok::<_, Error>(CacheUsage::Use), |k| { - if k == "use" { - Ok(CacheUsage::Use) - } else if k == "ignore" { - Ok(CacheUsage::Ignore) - } else if k == "recreate" { - Ok(CacheUsage::Recreate) - } else { - Err(Error::with_msg(format!("unexpected cacheUsage {:?}", k)))? - } - })?; - Ok(ret) + // Missing query parameter is not an error + pub fn from_pairs(pairs: &BTreeMap) -> Result, Error> { + pairs + .get("cacheUsage") + .map(|k| { + if k == "use" { + Ok(Some(CacheUsage::Use)) + } else if k == "ignore" { + Ok(Some(CacheUsage::Ignore)) + } else if k == "recreate" { + Ok(Some(CacheUsage::Recreate)) + } else { + Err(Error::with_msg(format!("unexpected cacheUsage {:?}", k)))? + } + }) + .unwrap_or(Ok(None)) } pub fn from_string(s: &str) -> Result { @@ -81,18 +84,18 @@ impl fmt::Display for CacheUsage { pub struct PlainEventsQuery { channel: Channel, range: NanoRange, - agg_kind: AggKind, - timeout: Duration, + #[serde(default, skip_serializing_if = "Option::is_none")] + agg_kind: Option, + #[serde(default, skip_serializing_if = "Option::is_none", with = "humantime_serde")] + timeout: Option, #[serde(default, skip_serializing_if = "Option::is_none")] events_max: Option, #[serde(default, skip_serializing_if = "Option::is_none", with = "humantime_serde")] event_delay: Option, #[serde(default, skip_serializing_if = "Option::is_none")] stream_batch_len: Option, - #[serde(default, skip_serializing_if = "is_false")] - report_error: bool, - #[serde(default, skip_serializing_if = "is_false")] - do_log: bool, + #[serde(default, skip_serializing_if = "Option::is_none")] + buf_len_disk_io: Option, #[serde(default, skip_serializing_if = "is_false")] do_test_main_error: bool, #[serde(default, skip_serializing_if = "is_false")] @@ -103,10 +106,9 @@ impl PlainEventsQuery { pub fn new( channel: Channel, range: NanoRange, - agg_kind: AggKind, - timeout: Duration, + agg_kind: Option, + timeout: Option, events_max: Option, - do_log: bool, ) -> Self { Self { channel, @@ -116,8 +118,7 @@ impl PlainEventsQuery { events_max, event_delay: None, stream_batch_len: None, - report_error: false, - do_log, + buf_len_disk_io: None, do_test_main_error: false, do_test_stream_error: false, } @@ -131,20 +132,27 @@ impl PlainEventsQuery { &self.range } - pub fn agg_kind(&self) -> &AggKind { + pub fn agg_kind(&self) -> &Option { &self.agg_kind } - pub fn report_error(&self) -> bool { - self.report_error + pub fn agg_kind_value(&self) -> AggKind { + self.agg_kind.as_ref().map_or(AggKind::Plain, |x| x.clone()) } - pub fn disk_io_buffer_size(&self) -> usize { - 1024 * 8 + pub fn one_before_range(&self) -> bool { + match &self.agg_kind { + Some(k) => k.need_expand(), + None => false, + } + } + + pub fn buf_len_disk_io(&self) -> usize { + self.buf_len_disk_io.unwrap_or(1024 * 8) } pub fn timeout(&self) -> Duration { - self.timeout + self.timeout.unwrap_or(Duration::from_millis(10000)) } pub fn events_max(&self) -> u64 { @@ -155,10 +163,6 @@ impl PlainEventsQuery { &self.event_delay } - pub fn do_log(&self) -> bool { - self.do_log - } - pub fn do_test_main_error(&self) -> bool { self.do_test_main_error } @@ -171,10 +175,6 @@ impl PlainEventsQuery { self.channel.series = Some(series); } - pub fn set_timeout(&mut self, k: Duration) { - self.timeout = k; - } - pub fn set_do_test_main_error(&mut self, k: bool) { self.do_test_main_error = k; } @@ -182,19 +182,6 @@ impl PlainEventsQuery { pub fn set_do_test_stream_error(&mut self, k: bool) { self.do_test_stream_error = k; } - - // TODO remove again. - pub fn adjust_for_events_query(&mut self) { - match &self.agg_kind { - AggKind::EventBlobs => {} - AggKind::DimXBins1 => {} - AggKind::DimXBinsN(_) => {} - AggKind::Plain => {} - AggKind::TimeWeightedScalar => { - self.agg_kind = AggKind::Plain; - } - } - } } impl HasBackend for PlainEventsQuery { @@ -205,7 +192,7 @@ impl HasBackend for PlainEventsQuery { impl HasTimeout for PlainEventsQuery { fn timeout(&self) -> Duration { - self.timeout.clone() + self.timeout() } } @@ -224,13 +211,11 @@ impl FromUrl for PlainEventsQuery { beg: beg_date.parse::>()?.to_nanos(), end: end_date.parse::>()?.to_nanos(), }, - agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::Plain), + agg_kind: agg_kind_from_binning_scheme(&pairs)?, timeout: pairs .get("timeout") - .map_or("10000", |k| k) - .parse::() - .map(|k| Duration::from_millis(k)) - .map_err(|e| Error::with_public_msg(format!("can not parse timeout {:?}", e)))?, + .map(|x| x.parse::().map(Duration::from_millis).ok()) + .unwrap_or(None), events_max: pairs .get("eventsMax") .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, @@ -240,26 +225,19 @@ impl FromUrl for PlainEventsQuery { stream_batch_len: pairs .get("streamBatchLen") .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, - report_error: pairs - .get("reportError") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_public_msg(format!("can not parse reportError {:?}", e)))?, - do_log: pairs - .get("doLog") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_public_msg(format!("can not parse doLog {:?}", e)))?, + buf_len_disk_io: pairs + .get("bufLenDiskIo") + .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, do_test_main_error: pairs .get("doTestMainError") .map_or("false", |k| k) .parse() - .map_err(|e| Error::with_public_msg(format!("can not parse doTestMainError {:?}", e)))?, + .map_err(|e| Error::with_public_msg(format!("can not parse doTestMainError: {}", e)))?, do_test_stream_error: pairs .get("doTestStreamError") .map_or("false", |k| k) .parse() - .map_err(|e| Error::with_public_msg(format!("can not parse doTestStreamError {:?}", e)))?, + .map_err(|e| Error::with_public_msg(format!("can not parse doTestStreamError: {}", e)))?, }; Ok(ret) } @@ -267,9 +245,11 @@ impl FromUrl for PlainEventsQuery { impl AppendToUrl for PlainEventsQuery { fn append_to_url(&self, url: &mut Url) { - let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; + let date_fmt = "%Y-%m-%dT%H:%M:%S.%6fZ"; self.channel.append_to_url(url); - binning_scheme_append_to_url(&self.agg_kind, url); + if let Some(x) = &self.agg_kind { + binning_scheme_append_to_url(x, url); + } let mut g = url.query_pairs_mut(); g.append_pair( "begDate", @@ -279,7 +259,9 @@ impl AppendToUrl for PlainEventsQuery { "endDate", &Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(), ); - g.append_pair("timeout", &format!("{}", self.timeout.as_millis())); + if let Some(x) = &self.timeout { + g.append_pair("timeout", &format!("{}", x.as_millis())); + } if let Some(x) = self.events_max.as_ref() { g.append_pair("eventsMax", &format!("{}", x)); } @@ -289,41 +271,49 @@ impl AppendToUrl for PlainEventsQuery { if let Some(x) = self.stream_batch_len.as_ref() { g.append_pair("streamBatchLen", &format!("{}", x)); } - if self.do_log { - g.append_pair("doLog", &format!("{}", self.do_log)); + if let Some(x) = self.buf_len_disk_io.as_ref() { + g.append_pair("bufLenDiskIo", &format!("{}", x)); + } + if self.do_test_main_error { + g.append_pair("doTestMainError", "true"); + } + if self.do_test_stream_error { + g.append_pair("doTestStreamError", "true"); } } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct BinnedQuery { channel: Channel, range: NanoRange, bin_count: u32, - agg_kind: AggKind, - cache_usage: CacheUsage, - disk_io_buffer_size: usize, - disk_stats_every: ByteSize, - report_error: bool, - timeout: Duration, - abort_after_bin_count: u32, - do_log: bool, + #[serde(default, skip_serializing_if = "Option::is_none")] + agg_kind: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + cache_usage: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + bins_max: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + timeout: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + buf_len_disk_io: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + disk_stats_every: Option, } impl BinnedQuery { - pub fn new(channel: Channel, range: NanoRange, bin_count: u32, agg_kind: AggKind) -> Self { + pub fn new(channel: Channel, range: NanoRange, bin_count: u32, agg_kind: Option) -> Self { Self { channel, range, bin_count, agg_kind, - cache_usage: CacheUsage::Use, - disk_io_buffer_size: 1024 * 4, - disk_stats_every: ByteSize(1024 * 1024 * 4), - report_error: false, - timeout: Duration::from_millis(2000), - abort_after_bin_count: 0, - do_log: false, + cache_usage: None, + bins_max: None, + buf_len_disk_io: None, + disk_stats_every: None, + timeout: None, } } @@ -339,36 +329,44 @@ impl BinnedQuery { self.bin_count } - pub fn agg_kind(&self) -> &AggKind { - &self.agg_kind + pub fn agg_kind(&self) -> AggKind { + match &self.agg_kind { + Some(x) => x.clone(), + None => AggKind::TimeWeightedScalar, + } } - pub fn cache_usage(&self) -> &CacheUsage { - &self.cache_usage + pub fn cache_usage(&self) -> CacheUsage { + self.cache_usage.as_ref().map_or(CacheUsage::Use, |x| x.clone()) } - pub fn disk_stats_every(&self) -> &ByteSize { - &self.disk_stats_every + pub fn disk_stats_every(&self) -> ByteSize { + match &self.disk_stats_every { + Some(x) => x.clone(), + None => ByteSize(1024 * 1024 * 4), + } } - pub fn disk_io_buffer_size(&self) -> usize { - self.disk_io_buffer_size + pub fn buf_len_disk_io(&self) -> usize { + match self.buf_len_disk_io { + Some(x) => x, + None => 1024 * 16, + } } - pub fn report_error(&self) -> bool { - self.report_error + pub fn timeout(&self) -> Option { + self.timeout.clone() } - pub fn timeout(&self) -> Duration { - self.timeout + pub fn timeout_value(&self) -> Duration { + match &self.timeout { + Some(x) => x.clone(), + None => Duration::from_millis(10000), + } } - pub fn abort_after_bin_count(&self) -> u32 { - self.abort_after_bin_count - } - - pub fn do_log(&self) -> bool { - self.do_log + pub fn bins_max(&self) -> u32 { + self.bins_max.unwrap_or(1024) } pub fn set_series_id(&mut self, series: u64) { @@ -380,19 +378,19 @@ impl BinnedQuery { } pub fn set_cache_usage(&mut self, k: CacheUsage) { - self.cache_usage = k; + self.cache_usage = Some(k); } pub fn set_disk_stats_every(&mut self, k: ByteSize) { - self.disk_stats_every = k; + self.disk_stats_every = Some(k); } pub fn set_timeout(&mut self, k: Duration) { - self.timeout = k; + self.timeout = Some(k); } - pub fn set_disk_io_buffer_size(&mut self, k: usize) { - self.disk_io_buffer_size = k; + pub fn set_buf_len_disk_io(&mut self, k: usize) { + self.buf_len_disk_io = Some(k); } } @@ -404,7 +402,7 @@ impl HasBackend for BinnedQuery { impl HasTimeout for BinnedQuery { fn timeout(&self) -> Duration { - self.timeout.clone() + self.timeout_value() } } @@ -417,10 +415,6 @@ impl FromUrl for BinnedQuery { fn from_pairs(pairs: &BTreeMap) -> Result { let beg_date = pairs.get("begDate").ok_or(Error::with_msg("missing begDate"))?; let end_date = pairs.get("endDate").ok_or(Error::with_msg("missing endDate"))?; - let disk_stats_every = pairs.get("diskStatsEveryKb").map_or("2000", |k| k); - let disk_stats_every = disk_stats_every - .parse() - .map_err(|e| Error::with_msg(format!("can not parse diskStatsEveryKb {:?}", e)))?; let ret = Self { channel: Channel::from_pairs(&pairs)?, range: NanoRange { @@ -432,35 +426,26 @@ impl FromUrl for BinnedQuery { .ok_or(Error::with_msg("missing binCount"))? .parse() .map_err(|e| Error::with_msg(format!("can not parse binCount {:?}", e)))?, - agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::TimeWeightedScalar), + agg_kind: agg_kind_from_binning_scheme(&pairs)?, cache_usage: CacheUsage::from_pairs(&pairs)?, - disk_io_buffer_size: pairs - .get("diskIoBufferSize") - .map_or("4096", |k| k) - .parse() - .map_err(|e| Error::with_msg(format!("can not parse diskIoBufferSize {:?}", e)))?, - disk_stats_every: ByteSize::kb(disk_stats_every), - report_error: pairs - .get("reportError") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, + buf_len_disk_io: pairs + .get("bufLenDiskIo") + .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + disk_stats_every: pairs + .get("diskStatsEveryKb") + .map(|k| k.parse().ok()) + .unwrap_or(None) + .map(ByteSize::kb), + /*report_error: pairs + .get("reportError") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?,*/ timeout: pairs .get("timeout") - .map_or("6000", |k| k) - .parse::() - .map(|k| Duration::from_millis(k)) - .map_err(|e| Error::with_msg(format!("can not parse timeout {:?}", e)))?, - abort_after_bin_count: pairs - .get("abortAfterBinCount") - .map_or("0", |k| k) - .parse() - .map_err(|e| Error::with_msg(format!("can not parse abortAfterBinCount {:?}", e)))?, - do_log: pairs - .get("doLog") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_msg(format!("can not parse doLog {:?}", e)))?, + .map(|x| x.parse::().map(Duration::from_millis).ok()) + .unwrap_or(None), + bins_max: pairs.get("binsMax").map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, }; debug!("BinnedQuery::from_url {:?}", ret); Ok(ret) @@ -469,17 +454,13 @@ impl FromUrl for BinnedQuery { impl AppendToUrl for BinnedQuery { fn append_to_url(&self, url: &mut Url) { - let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; + let date_fmt = "%Y-%m-%dT%H:%M:%S.%6fZ"; { self.channel.append_to_url(url); let mut g = url.query_pairs_mut(); - match &self.cache_usage { - CacheUsage::Use => {} - _ => { - g.append_pair("cacheUsage", &self.cache_usage.to_string()); - } + if let Some(x) = &self.cache_usage { + g.append_pair("cacheUsage", &x.query_param_value()); } - g.append_pair("binCount", &format!("{}", self.bin_count)); g.append_pair( "begDate", &Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(), @@ -488,21 +469,24 @@ impl AppendToUrl for BinnedQuery { "endDate", &Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(), ); + g.append_pair("binCount", &format!("{}", self.bin_count)); } - { - binning_scheme_append_to_url(&self.agg_kind, url); + if let Some(x) = &self.agg_kind { + binning_scheme_append_to_url(x, url); } { let mut g = url.query_pairs_mut(); - // TODO - //g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size)); - //g.append_pair("diskStatsEveryKb", &format!("{}", self.disk_stats_every.bytes() / 1024)); - g.append_pair("timeout", &format!("{}", self.timeout.as_millis())); - if self.abort_after_bin_count > 0 { - g.append_pair("abortAfterBinCount", &format!("{}", self.abort_after_bin_count)); + if let Some(x) = &self.timeout { + g.append_pair("timeout", &format!("{}", x.as_millis())); } - if self.do_log { - g.append_pair("doLog", &format!("{}", self.do_log)); + if let Some(x) = self.bins_max { + g.append_pair("binsMax", &format!("{}", x)); + } + if let Some(x) = self.buf_len_disk_io { + g.append_pair("bufLenDiskIo", &format!("{}", x)); + } + if let Some(x) = &self.disk_stats_every { + g.append_pair("diskStatsEveryKb", &format!("{}", x.bytes() / 1024)); } } } @@ -511,7 +495,9 @@ impl AppendToUrl for BinnedQuery { pub fn binning_scheme_append_to_url(agg_kind: &AggKind, url: &mut Url) { let mut g = url.query_pairs_mut(); match agg_kind { - AggKind::EventBlobs => panic!(), + AggKind::EventBlobs => { + g.append_pair("binningScheme", "eventBlobs"); + } AggKind::TimeWeightedScalar => { g.append_pair("binningScheme", "timeWeightedScalar"); } @@ -528,26 +514,28 @@ pub fn binning_scheme_append_to_url(agg_kind: &AggKind, url: &mut Url) { } } -pub fn agg_kind_from_binning_scheme(pairs: &BTreeMap) -> Result { +// Absent AggKind is not considered an error. +pub fn agg_kind_from_binning_scheme(pairs: &BTreeMap) -> Result, Error> { let key = "binningScheme"; - let s = pairs - .get(key) - .map_or(Err(Error::with_msg(format!("can not find {}", key))), |k| Ok(k))?; - let ret = if s == "eventBlobs" { - AggKind::EventBlobs - } else if s == "fullValue" { - AggKind::Plain - } else if s == "timeWeightedScalar" { - AggKind::TimeWeightedScalar - } else if s == "unweightedScalar" { - AggKind::DimXBins1 - } else if s == "binnedX" { - let u = pairs.get("binnedXcount").map_or("1", |k| k).parse()?; - AggKind::DimXBinsN(u) + if let Some(s) = pairs.get(key) { + let ret = if s == "eventBlobs" { + AggKind::EventBlobs + } else if s == "fullValue" { + AggKind::Plain + } else if s == "timeWeightedScalar" { + AggKind::TimeWeightedScalar + } else if s == "unweightedScalar" { + AggKind::DimXBins1 + } else if s == "binnedX" { + let u = pairs.get("binnedXcount").map_or("1", |k| k).parse()?; + AggKind::DimXBinsN(u) + } else { + return Err(Error::with_msg("can not extract binningScheme")); + }; + Ok(Some(ret)) } else { - return Err(Error::with_msg("can not extract binningScheme")); - }; - Ok(ret) + Ok(None) + } } #[derive(Clone, Debug)] @@ -614,7 +602,7 @@ impl FromUrl for ChannelStateEventsQuery { impl AppendToUrl for ChannelStateEventsQuery { fn append_to_url(&self, url: &mut Url) { - let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; + let date_fmt = "%Y-%m-%dT%H:%M:%S.%6fZ"; self.channel.append_to_url(url); let mut g = url.query_pairs_mut(); g.append_pair( diff --git a/netpod/src/query/prebinned.rs b/netpod/src/query/prebinned.rs index 53d050e..4137484 100644 --- a/netpod/src/query/prebinned.rs +++ b/netpod/src/query/prebinned.rs @@ -2,7 +2,14 @@ use super::agg_kind_from_binning_scheme; use super::binning_scheme_append_to_url; use super::CacheUsage; use crate::timeunits::SEC; -use crate::{AggKind, AppendToUrl, ByteSize, Channel, FromUrl, PreBinnedPatchCoord, ScalarType, Shape}; +use crate::AggKind; +use crate::AppendToUrl; +use crate::ByteSize; +use crate::Channel; +use crate::FromUrl; +use crate::PreBinnedPatchCoord; +use crate::ScalarType; +use crate::Shape; use err::Error; use std::collections::BTreeMap; use url::Url; @@ -10,14 +17,13 @@ use url::Url; #[derive(Clone, Debug)] pub struct PreBinnedQuery { patch: PreBinnedPatchCoord, - agg_kind: AggKind, channel: Channel, scalar_type: ScalarType, shape: Shape, - cache_usage: CacheUsage, - disk_io_buffer_size: usize, - disk_stats_every: ByteSize, - report_error: bool, + agg_kind: Option, + cache_usage: Option, + buf_len_disk_io: Option, + disk_stats_every: Option, } impl PreBinnedQuery { @@ -26,11 +32,10 @@ impl PreBinnedQuery { channel: Channel, scalar_type: ScalarType, shape: Shape, - agg_kind: AggKind, - cache_usage: CacheUsage, - disk_io_buffer_size: usize, - disk_stats_every: ByteSize, - report_error: bool, + agg_kind: Option, + cache_usage: Option, + buf_len_disk_io: Option, + disk_stats_every: Option, ) -> Self { Self { patch, @@ -39,9 +44,8 @@ impl PreBinnedQuery { shape, agg_kind, cache_usage, - disk_io_buffer_size, + buf_len_disk_io, disk_stats_every, - report_error, } } @@ -63,12 +67,6 @@ impl PreBinnedQuery { .get("patchIx") .ok_or_else(|| Error::with_msg("missing patchIx"))? .parse()?; - let disk_stats_every = pairs - .get("diskStatsEveryKb") - .ok_or_else(|| Error::with_msg("missing diskStatsEveryKb"))?; - let disk_stats_every = disk_stats_every - .parse() - .map_err(|e| Error::with_msg(format!("can not parse diskStatsEveryKb {:?}", e)))?; let scalar_type = pairs .get("scalarType") .ok_or_else(|| Error::with_msg("missing scalarType")) @@ -82,19 +80,16 @@ impl PreBinnedQuery { channel: Channel::from_pairs(&pairs)?, scalar_type, shape, - agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::DimXBins1), + agg_kind: agg_kind_from_binning_scheme(&pairs)?, cache_usage: CacheUsage::from_pairs(&pairs)?, - disk_io_buffer_size: pairs - .get("diskIoBufferSize") - .map_or("4096", |k| k) - .parse() - .map_err(|e| Error::with_msg(format!("can not parse diskIoBufferSize {:?}", e)))?, - disk_stats_every: ByteSize::kb(disk_stats_every), - report_error: pairs - .get("reportError") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, + buf_len_disk_io: pairs + .get("bufLenDiskIo") + .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + disk_stats_every: pairs + .get("diskStatsEveryKb") + .map(|k| k.parse().ok()) + .unwrap_or(None) + .map(ByteSize::kb), }; Ok(ret) } @@ -103,10 +98,6 @@ impl PreBinnedQuery { &self.patch } - pub fn report_error(&self) -> bool { - self.report_error - } - pub fn channel(&self) -> &Channel { &self.channel } @@ -119,35 +110,45 @@ impl PreBinnedQuery { &self.shape } - pub fn agg_kind(&self) -> &AggKind { + pub fn agg_kind(&self) -> &Option { &self.agg_kind } pub fn disk_stats_every(&self) -> ByteSize { - self.disk_stats_every.clone() + match &self.disk_stats_every { + Some(x) => x.clone(), + None => ByteSize(1024 * 1024 * 4), + } } pub fn cache_usage(&self) -> CacheUsage { - self.cache_usage.clone() + self.cache_usage.as_ref().map_or(CacheUsage::Use, |x| x.clone()) } - pub fn disk_io_buffer_size(&self) -> usize { - self.disk_io_buffer_size + pub fn buf_len_disk_io(&self) -> usize { + self.buf_len_disk_io.unwrap_or(1024 * 8) } } impl AppendToUrl for PreBinnedQuery { fn append_to_url(&self, url: &mut Url) { self.patch.append_to_url(url); - binning_scheme_append_to_url(&self.agg_kind, url); self.channel.append_to_url(url); self.shape.append_to_url(url); self.scalar_type.append_to_url(url); + if let Some(x) = &self.agg_kind { + binning_scheme_append_to_url(x, url); + } let mut g = url.query_pairs_mut(); // TODO add also impl AppendToUrl for these if applicable: - g.append_pair("cacheUsage", &format!("{}", self.cache_usage.query_param_value())); - g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size)); - g.append_pair("diskStatsEveryKb", &format!("{}", self.disk_stats_every.bytes() / 1024)); - g.append_pair("reportError", &format!("{}", self.report_error())); + if let Some(x) = &self.cache_usage { + g.append_pair("cacheUsage", &x.query_param_value()); + } + if let Some(x) = self.buf_len_disk_io { + g.append_pair("bufLenDiskIo", &format!("{}", x)); + } + if let Some(x) = &self.disk_stats_every { + g.append_pair("diskStatsEveryKb", &format!("{}", x.bytes() / 1024)); + } } } diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index ade0f53..ff26c0b 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -236,7 +236,7 @@ async fn events_conn_handler_inner_try( } let mut stream: Pin> + Send>> = - if let AggKind::EventBlobs = evq.agg_kind() { + if let AggKind::EventBlobs = evq.agg_kind_value() { match disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await { Ok(stream) => { let stream = stream.map(|x| Box::new(x) as _); diff --git a/nodenet/src/conn/test.rs b/nodenet/src/conn/test.rs index 3be36e4..96f428d 100644 --- a/nodenet/src/conn/test.rs +++ b/nodenet/src/conn/test.rs @@ -60,7 +60,13 @@ fn raw_data_00() { beg: SEC, end: SEC * 10, }; - let qu = PlainEventsQuery::new(channel, range, AggKind::Plain, Duration::from_millis(10000), None, true); + let qu = PlainEventsQuery::new( + channel, + range, + Some(AggKind::Plain), + Some(Duration::from_millis(10000)), + None, + ); let query = EventQueryJsonStringFrame(serde_json::to_string(&qu).unwrap()); let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(query))); let frame = make_frame(&item).unwrap(); diff --git a/scyllaconn/src/bincache.rs b/scyllaconn/src/bincache.rs index eddeba2..5d00c32 100644 --- a/scyllaconn/src/bincache.rs +++ b/scyllaconn/src/bincache.rs @@ -372,6 +372,7 @@ pub async fn fetch_uncached_binned_events( .time_binner_new(edges.clone(), do_time_weight); // TODO handle deadline better let deadline = Instant::now(); + // TODO take timeout from query let deadline = deadline .checked_add(Duration::from_millis(6000)) .ok_or_else(|| Error::with_msg_no_trace(format!("deadline overflow")))?; @@ -379,10 +380,10 @@ pub async fn fetch_uncached_binned_events( let evq = PlainEventsQuery::new( chn.channel.clone(), coord.patch_range(), - AggKind::TimeWeightedScalar, - Duration::from_millis(8000), + Some(agg_kind), + // TODO take from query + Some(Duration::from_millis(8000)), None, - true, ); let mut events_dyn = EventsStreamScylla::new( series, diff --git a/streams/src/eventchunker.rs b/streams/src/eventchunker.rs index ef052f0..017d895 100644 --- a/streams/src/eventchunker.rs +++ b/streams/src/eventchunker.rs @@ -205,7 +205,6 @@ impl EventChunker { let _ttl = sl.read_i64::().unwrap(); let ts = sl.read_i64::().unwrap() as u64; let pulse = sl.read_i64::().unwrap() as u64; - info!("SEE {ts:20} {pulse:20} {0}", self.dbgdesc); if ts == self.max_ts { if self.repeated_ts_warn_count < 20 { let msg = format!( diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs index b1f212d..4e49c6f 100644 --- a/streams/src/plaineventsjson.rs +++ b/streams/src/plaineventsjson.rs @@ -5,6 +5,7 @@ use futures_util::StreamExt; use items_2::channelevents::ChannelEvents; use netpod::log::*; use netpod::query::PlainEventsQuery; +use netpod::AggKind; use netpod::ChConf; use netpod::Cluster; use serde_json::Value as JsonValue; @@ -19,9 +20,11 @@ pub async fn plain_events_json( // TODO remove magic constant let deadline = Instant::now() + query.timeout() + Duration::from_millis(1000); let events_max = query.events_max(); - let mut evquery = query.clone(); - evquery.adjust_for_events_query(); - let empty = items_2::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape, evquery.agg_kind())?; + let evquery = query.clone(); + info!("plain_events_json evquery {:?}", evquery); + let ev_agg_kind = evquery.agg_kind().as_ref().map_or(AggKind::Plain, |x| x.clone()); + info!("plain_events_json ev_agg_kind {:?}", ev_agg_kind); + let empty = items_2::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape, &ev_agg_kind)?; info!("plain_events_json with empty item {}", empty.type_name()); let empty = ChannelEvents::Events(empty); let empty = items::sitem_data(empty); @@ -30,6 +33,7 @@ pub async fn plain_events_json( //let inps = open_tcp_streams::<_, Box>(&query, cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader: let stream = items_2::merger::Merger::new(inps, 1024); + let stream = crate::rangefilter2::RangeFilter2::new(stream, query.range().clone(), evquery.one_before_range()); let stream = stream::iter([empty]).chain(stream); let collected = crate::collect::collect(stream, deadline, events_max, Some(query.range().clone()), None).await?; let jsval = serde_json::to_value(&collected)?; diff --git a/streams/src/rangefilter2.rs b/streams/src/rangefilter2.rs index 6fe4223..a28caea 100644 --- a/streams/src/rangefilter2.rs +++ b/streams/src/rangefilter2.rs @@ -1,17 +1,13 @@ use err::Error; use futures_util::Stream; use futures_util::StreamExt; -use items::Appendable; -use items::Clearable; -use items::PushableIndex; use items::RangeCompletableItem; use items::Sitemty; use items::StatsItem; use items::StreamItem; -use items::WithTimestamps; +use items_2::merger::Mergeable; use netpod::log::*; use netpod::NanoRange; -use netpod::Nanos; use netpod::RangeFilterStats; use std::fmt; use std::pin::Pin; @@ -21,64 +17,125 @@ use std::task::Poll; pub struct RangeFilter2 where S: Stream> + Unpin, - ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, + ITY: Mergeable, { inp: S, range: NanoRange, range_str: String, - expand: bool, - ts_max: u64, + one_before_range: bool, stats: RangeFilterStats, - prerange: Option, - have_pre: bool, + slot1: Option, + slot2: Option, have_range_complete: bool, - emitted_post: bool, data_done: bool, raco_done: bool, done: bool, complete: bool, - items_with_pre: usize, - items_with_post: usize, - items_with_unordered: usize, } impl RangeFilter2 where S: Stream> + Unpin, - ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, + ITY: Mergeable, { - fn selfname() -> &'static str { + const fn selfname() -> &'static str { "RangeFilter2" } - pub fn new(inp: S, range: NanoRange, expand: bool) -> Self { - trace!("{}::new range: {:?} expand: {:?}", Self::selfname(), range, expand); + pub fn new(inp: S, range: NanoRange, one_before_range: bool) -> Self { + trace!( + "{}::new range: {:?} one_before_range: {:?}", + Self::selfname(), + range, + one_before_range + ); Self { inp, range_str: format!("{:?}", range), range, - expand, - ts_max: 0, + one_before_range, stats: RangeFilterStats::new(), - prerange: None, - have_pre: false, + slot1: None, + slot2: None, have_range_complete: false, - emitted_post: false, data_done: false, raco_done: false, done: false, complete: false, - items_with_pre: 0, - items_with_post: 0, - items_with_unordered: 0, } } + + fn prune_high(&mut self, mut item: ITY, ts: u64) -> Result { + let ret = match item.find_highest_index_lt(ts) { + Some(ihlt) => { + let n = item.len(); + if ihlt + 1 == n { + // TODO gather stats, this should be the most common case. + self.stats.items_no_prune_high += 1; + item + } else { + self.stats.items_part_prune_high += 1; + let mut dummy = item.new_empty(); + item.drain_into(&mut dummy, (ihlt + 1, n))?; + item + } + } + None => { + self.stats.items_all_prune_high += 1; + item.new_empty() + } + }; + Ok(ret) + } + + fn handle_item(&mut self, item: ITY) -> Result { + let mut item = self.prune_high(item, self.range.end)?; + let ret = if self.one_before_range { + match item.find_lowest_index_ge(self.range.beg) { + Some(ilge) => { + if ilge == 0 { + if let Some(sl1) = self.slot1.take() { + self.slot2 = Some(item); + sl1 + } else { + item + } + } else { + let mut dummy = item.new_empty(); + item.drain_into(&mut dummy, (0, ilge - 1))?; + self.slot1 = None; + item + } + } + None => { + let n = item.len(); + let mut keep = item.new_empty(); + item.drain_into(&mut keep, (n.max(1) - 1, n))?; + self.slot1 = Some(keep); + item.new_empty() + } + } + } else { + match item.find_lowest_index_ge(self.range.beg) { + Some(ilge) => { + let mut dummy = item.new_empty(); + item.drain_into(&mut dummy, (0, ilge))?; + item + } + None => { + // TODO count case for stats + item.new_empty() + } + } + }; + Ok(ret) + } } impl RangeFilter2 where S: Stream> + Unpin, - ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, + ITY: Mergeable, { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll::Item>> { use Poll::*; @@ -100,95 +157,18 @@ where } else { continue; } + } else if let Some(sl2) = self.slot2.take() { + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(sl2))))) } else { match self.inp.poll_next_unpin(cx) { Ready(Some(item)) => match item { - Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => { - let mut contains_pre = false; - let mut contains_post = false; - let mut contains_unordered = false; - let mut ret = item.empty_like_self(); - for i1 in 0..item.len() { - let ts = item.ts(i1); - if ts < self.ts_max { - contains_unordered = true; - if false { - self.done = true; - let msg = format!( - "unordered event i1 {} / {} ts {:?} ts_max {:?}", - i1, - item.len(), - Nanos::from_ns(ts), - Nanos::from_ns(self.ts_max) - ); - error!("{}", msg); - return Ready(Some(Err(Error::with_msg(msg)))); - } - } else { - self.ts_max = ts; - if ts < self.range.beg { - contains_pre = true; - if self.expand { - let mut prerange = if let Some(prerange) = self.prerange.take() { - prerange - } else { - item.empty_like_self() - }; - prerange.clear(); - prerange.push_index(&item, i1); - self.prerange = Some(prerange); - self.have_pre = true; - } - } else if ts >= self.range.end { - contains_post = true; - self.have_range_complete = true; - if self.expand { - if self.have_pre { - let prerange = if let Some(prerange) = &mut self.prerange { - prerange - } else { - panic!() - }; - ret.push_index(prerange, 0); - prerange.clear(); - self.have_pre = false; - } - if !self.emitted_post { - self.emitted_post = true; - ret.push_index(&item, i1); - //self.data_done = true; - } - } else { - //self.data_done = true; - } - } else { - if self.expand { - if self.have_pre { - let prerange = if let Some(prerange) = &mut self.prerange { - prerange - } else { - panic!() - }; - ret.push_index(prerange, 0); - prerange.clear(); - self.have_pre = false; - } - } - ret.push_index(&item, i1); - } - } + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => match self.handle_item(item) { + Ok(item) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))), + Err(e) => { + self.data_done = true; + Ready(Some(Err(e))) } - if contains_pre { - self.items_with_pre += 1; - } - if contains_post { - self.items_with_post += 1; - } - if contains_unordered { - self.items_with_unordered += 1; - } - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) - } + }, Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => { self.have_range_complete = true; continue; @@ -197,20 +177,7 @@ where }, Ready(None) => { self.data_done = true; - if self.have_pre { - let prerange = if let Some(prerange) = &mut self.prerange { - prerange - } else { - panic!() - }; - let mut ret = prerange.empty_like_self(); - ret.push_index(&prerange, 0); - prerange.clear(); - self.have_pre = false; - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) - } else { - continue; - } + continue; } Pending => Pending, } @@ -222,35 +189,32 @@ where impl Stream for RangeFilter2 where S: Stream> + Unpin, - ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, + ITY: Mergeable, { type Item = Sitemty; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let span1 = span!(Level::INFO, "RangeFilter", range = tracing::field::Empty); + let span1 = span!(Level::TRACE, "RangeFilter2", range = tracing::field::Empty); span1.record("range", &self.range_str.as_str()); - span1.in_scope(|| Self::poll_next(self, cx)) + let _spg = span1.enter(); + Self::poll_next(self, cx) } } impl fmt::Debug for RangeFilter2 where S: Stream> + Unpin, - ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, + ITY: Mergeable, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct(Self::selfname()) - .field("items_with_pre", &self.items_with_pre) - .field("items_with_post", &self.items_with_post) - .field("items_with_unordered", &self.items_with_unordered) - .finish() + f.debug_struct(Self::selfname()).finish() } } impl Drop for RangeFilter2 where S: Stream> + Unpin, - ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, + ITY: Mergeable, { fn drop(&mut self) { debug!("drop {} {:?}", Self::selfname(), self); diff --git a/streams/src/timebinnedjson.rs b/streams/src/timebinnedjson.rs index 5317aab..bf9d27e 100644 --- a/streams/src/timebinnedjson.rs +++ b/streams/src/timebinnedjson.rs @@ -11,31 +11,29 @@ use netpod::BinnedRange; use netpod::ChConf; use netpod::Cluster; use serde_json::Value as JsonValue; -use std::time::Duration; use std::time::Instant; pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Cluster) -> Result { let binned_range = BinnedRange::covering_range(query.range().clone(), query.bin_count())?; let bins_max = 10000; let do_time_weight = query.agg_kind().do_time_weighted(); - let timeout = Duration::from_millis(7500); - let deadline = Instant::now() + timeout; - let empty = items_2::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape, query.agg_kind())?; + let deadline = Instant::now() + query.timeout_value(); + let empty = items_2::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape, &query.agg_kind())?; let empty = ChannelEvents::Events(empty); let empty = items::sitem_data(empty); - let rawquery = PlainEventsQuery::new( + let evquery = PlainEventsQuery::new( query.channel().clone(), query.range().clone(), - query.agg_kind().clone(), - timeout, + Some(query.agg_kind()), + query.timeout(), None, - true, ); - let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&rawquery, cluster).await?; + let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&evquery, cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader: info!("timebinned_json with empty item {empty:?}"); let stream = items_2::merger::Merger::new(inps, 128); let stream = stream::iter([empty]).chain(stream); + let stream = crate::rangefilter2::RangeFilter2::new(stream, query.range().clone(), evquery.one_before_range()); let stream = Box::pin(stream); let stream = crate::timebin::TimeBinnedStream::new(stream, binned_range.edges(), do_time_weight, deadline); if false { diff --git a/taskrun/src/taskrun.rs b/taskrun/src/taskrun.rs index af7d6f3..add918f 100644 --- a/taskrun/src/taskrun.rs +++ b/taskrun/src/taskrun.rs @@ -20,6 +20,33 @@ pub fn get_runtime() -> Arc { get_runtime_opts(24, 128) } +#[allow(unused)] +fn on_thread_start() { + let old = panic::take_hook(); + panic::set_hook(Box::new(move |info| { + let payload = if let Some(k) = info.payload().downcast_ref::() { + format!("{:?}", k) + } else if let Some(k) = info.payload().downcast_ref::() { + k.into() + } else if let Some(&k) = info.payload().downcast_ref::<&str>() { + k.into() + } else { + format!("unknown payload type") + }; + error!( + "✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ panicking\n{:?}\nLOCATION: {:?}\nPAYLOAD: {:?}\ninfo object: {:?}\nerr: {:?}", + Error::with_msg("catched panic in taskrun::run"), + info.location(), + info.payload(), + info, + payload, + ); + if false { + old(info); + } + })); +} + pub fn get_runtime_opts(nworkers: usize, nblocking: usize) -> Arc { let mut g = RUNTIME.lock().unwrap(); match g.as_ref() { @@ -28,32 +55,7 @@ pub fn get_runtime_opts(nworkers: usize, nblocking: usize) -> Arc { .worker_threads(nworkers) .max_blocking_threads(nblocking) .enable_all() - .on_thread_start(|| { - let _old = panic::take_hook(); - panic::set_hook(Box::new(move |info| { - let payload = if let Some(k) = info.payload().downcast_ref::() { - format!("{:?}", k) - } - else if let Some(k) = info.payload().downcast_ref::() { - k.into() - } - else if let Some(&k) = info.payload().downcast_ref::<&str>() { - k.into() - } - else { - format!("unknown payload type") - }; - error!( - "✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ panicking\n{:?}\nLOCATION: {:?}\nPAYLOAD: {:?}\ninfo object: {:?}\nerr: {:?}", - Error::with_msg("catched panic in taskrun::run"), - info.location(), - info.payload(), - info, - payload, - ); - //old(info); - })); - }) + .on_thread_start(on_thread_start) .build(); let res = match res { Ok(x) => x,