diff --git a/daqbuffer/src/test.rs b/daqbuffer/src/test.rs index e661948..bd973ff 100644 --- a/daqbuffer/src/test.rs +++ b/daqbuffer/src/test.rs @@ -20,8 +20,8 @@ use std::fmt::Debug; use std::future::ready; use tokio::io::AsyncRead; +pub mod binnedjson; pub mod events; -pub mod json; #[test] fn get_binned_binary() { @@ -114,7 +114,7 @@ where let req = hyper::Request::builder() .method(http::Method::GET) .uri(url) - .header("accept", "application/octet-stream") + .header("Accept", "application/octet-stream") .body(Body::empty())?; let client = hyper::Client::new(); let res = client.request(req).await?; diff --git a/daqbuffer/src/test/json.rs b/daqbuffer/src/test/binnedjson.rs similarity index 100% rename from daqbuffer/src/test/json.rs rename to daqbuffer/src/test/binnedjson.rs diff --git a/daqbuffer/src/test/events.rs b/daqbuffer/src/test/events.rs index 4c8bd8c..b2e09d5 100644 --- a/daqbuffer/src/test/events.rs +++ b/daqbuffer/src/test/events.rs @@ -1,9 +1,9 @@ use crate::nodes::require_test_hosts_running; use chrono::{DateTime, Utc}; use disk::agg::streams::{StatsItem, StreamItem}; -use disk::binned::query::PlainEventsQuery; use disk::binned::{NumOps, RangeCompletableItem, WithLen}; use disk::decode::EventValues; +use disk::events::{PlainEventsJsonQuery, PlainEventsQuery}; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::frame::makeframe::FrameType; use disk::streamlog::Streamlog; @@ -70,7 +70,7 @@ where let req = hyper::Request::builder() .method(http::Method::GET) .uri(url) - .header("accept", "application/octet-stream") + .header("Accept", "application/octet-stream") .body(Body::empty())?; let client = hyper::Client::new(); let res = client.request(req).await?; @@ -230,6 +230,26 @@ async fn get_plain_events_json_0_inner() -> Result<(), Error> { Ok(()) } +#[test] +fn get_plain_events_json_1() { + taskrun::run(get_plain_events_json_1_inner()).unwrap(); +} + +async fn get_plain_events_json_1_inner() -> Result<(), Error> { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + get_plain_events_json( + "wave-f64-be-n21", + "1970-01-01T00:20:10.000Z", + "1970-01-01T00:20:12.000Z", + cluster, + true, + 4, + ) + .await?; + Ok(()) +} + async fn get_plain_events_json( channel_name: &str, beg_date: &str, @@ -248,14 +268,14 @@ async fn get_plain_events_json( name: channel_name.into(), }; let range = NanoRange::from_date_time(beg_date, end_date); - let query = PlainEventsQuery::new(channel, range); + let query = PlainEventsJsonQuery::new(channel, range); let hp = HostPort::from_node(node0); let url = query.url(&hp); info!("get_plain_events get {}", url); let req = hyper::Request::builder() .method(http::Method::GET) .uri(url) - .header("accept", "application/octet-stream") + .header("Accept", "application/octet-stream") .body(Body::empty())?; let client = hyper::Client::new(); let res = client.request(req).await?; diff --git a/disk/src/agg/enp.rs b/disk/src/agg/enp.rs index fdb1757..8cd5139 100644 --- a/disk/src/agg/enp.rs +++ b/disk/src/agg/enp.rs @@ -1,6 +1,7 @@ use crate::agg::binnedt::{TimeBinnableType, TimeBinnableTypeAggregator}; use crate::agg::streams::Appendable; use crate::agg::{Fits, FitsInside}; +use crate::binned::dim1::MinMaxAvgDim1Bins; use crate::binned::{ EventsNodeProcessor, FilterFittingInside, MinMaxAvgBins, NumOps, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, WithLen, WithTimestamps, @@ -268,6 +269,251 @@ where } } +#[derive(Serialize, Deserialize)] +pub struct WaveEvents { + pub tss: Vec, + pub vals: Vec>, +} + +impl WaveEvents { + pub fn empty() -> Self { + Self { + tss: vec![], + vals: vec![], + } + } +} + +impl WithLen for WaveEvents { + fn len(&self) -> usize { + self.tss.len() + } +} + +impl WithTimestamps for WaveEvents { + fn ts(&self, ix: usize) -> u64 { + self.tss[ix] + } +} + +impl RangeOverlapInfo for WaveEvents { + fn ends_before(&self, range: NanoRange) -> bool { + match self.tss.last() { + Some(&ts) => ts < range.beg, + None => true, + } + } + + fn ends_after(&self, range: NanoRange) -> bool { + match self.tss.last() { + Some(&ts) => ts >= range.end, + None => panic!(), + } + } + + fn starts_after(&self, range: NanoRange) -> bool { + match self.tss.first() { + Some(&ts) => ts >= range.end, + None => panic!(), + } + } +} + +impl FitsInside for WaveEvents { + fn fits_inside(&self, range: NanoRange) -> Fits { + if self.tss.is_empty() { + Fits::Empty + } else { + let t1 = *self.tss.first().unwrap(); + let t2 = *self.tss.last().unwrap(); + if t2 < range.beg { + Fits::Lower + } else if t1 > range.end { + Fits::Greater + } else if t1 < range.beg && t2 > range.end { + Fits::PartlyLowerAndGreater + } else if t1 < range.beg { + Fits::PartlyLower + } else if t2 > range.end { + Fits::PartlyGreater + } else { + Fits::Inside + } + } + } +} + +impl FilterFittingInside for WaveEvents { + fn filter_fitting_inside(self, fit_range: NanoRange) -> Option { + match self.fits_inside(fit_range) { + Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(self), + _ => None, + } + } +} + +impl PushableIndex for WaveEvents +where + NTY: NumOps, +{ + fn push_index(&mut self, src: &Self, ix: usize) { + self.tss.push(src.tss[ix]); + // TODO trait should allow to move from source. + self.vals.push(src.vals[ix].clone()); + } +} + +impl Appendable for WaveEvents +where + NTY: NumOps, +{ + fn empty() -> Self { + Self::empty() + } + + fn append(&mut self, src: &Self) { + self.tss.extend_from_slice(&src.tss); + self.vals.extend_from_slice(&src.vals); + } +} + +impl ReadableFromFile for WaveEvents +where + NTY: NumOps, +{ + fn read_from_file(_file: File) -> Result, Error> { + // TODO refactor types such that this impl is not needed. + panic!() + } + + fn from_buf(_buf: &[u8]) -> Result { + panic!() + } +} + +impl TimeBinnableType for WaveEvents +where + NTY: NumOps, +{ + type Output = MinMaxAvgDim1Bins; + type Aggregator = WaveEventsAggregator; + + fn aggregator(range: NanoRange) -> Self::Aggregator { + Self::Aggregator::new(range) + } +} + +pub struct WaveEventsAggregator +where + NTY: NumOps, +{ + range: NanoRange, + count: u64, + min: Option>, + max: Option>, + sumc: u64, + sum: Option>, +} + +impl WaveEventsAggregator +where + NTY: NumOps, +{ + pub fn new(range: NanoRange) -> Self { + Self { + range, + count: 0, + min: None, + max: None, + sumc: 0, + sum: None, + } + } +} + +impl TimeBinnableTypeAggregator for WaveEventsAggregator +where + NTY: NumOps, +{ + type Input = WaveEvents; + type Output = MinMaxAvgDim1Bins; + + fn range(&self) -> &NanoRange { + &self.range + } + + fn ingest(&mut self, item: &Self::Input) { + for i1 in 0..item.tss.len() { + let ts = item.tss[i1]; + if ts < self.range.beg { + continue; + } else if ts >= self.range.end { + continue; + } else { + match &mut self.min { + None => self.min = Some(item.vals[i1].clone()), + Some(min) => { + for (a, b) in min.iter_mut().zip(item.vals[i1].iter()) { + if b < a { + *a = *b; + } + } + } + }; + match &mut self.max { + None => self.max = Some(item.vals[i1].clone()), + Some(max) => { + for (a, b) in max.iter_mut().zip(item.vals[i1].iter()) { + if b < a { + *a = *b; + } + } + } + }; + match self.sum.as_mut() { + None => { + self.sum = Some(item.vals[i1].iter().map(|k| k.as_()).collect()); + } + Some(sum) => { + for (a, b) in sum.iter_mut().zip(item.vals[i1].iter()) { + let vf = b.as_(); + if vf.is_nan() { + } else { + *a += vf; + } + } + } + } + self.sumc += 1; + self.count += 1; + } + } + } + + fn result(self) -> Self::Output { + let avg = if self.sumc == 0 { + None + } else { + let avg = self + .sum + .as_ref() + .unwrap() + .iter() + .map(|item| item / self.sumc as f32) + .collect(); + Some(avg) + }; + Self::Output { + ts1s: vec![self.range.beg], + ts2s: vec![self.range.end], + counts: vec![self.count], + mins: vec![self.min], + maxs: vec![self.max], + avgs: vec![avg], + } + } +} + pub struct WaveXBinner { _m1: PhantomData, } @@ -336,3 +582,48 @@ where ret } } + +pub struct WaveNBinner { + _m1: PhantomData, +} + +impl EventsNodeProcessor for WaveNBinner +where + NTY: NumOps, +{ + type Input = Vec; + // TODO need new container type for this case: + type Output = XBinnedScalarEvents; + + fn process(_inp: EventValues) -> Self::Output { + err::todoval() + } +} + +pub struct WavePlainProc { + _m1: PhantomData, +} + +impl EventsNodeProcessor for WavePlainProc +where + NTY: NumOps, +{ + type Input = Vec; + type Output = WaveEvents; + + fn process(inp: EventValues) -> Self::Output { + if false { + let n = if inp.values.len() > 0 { inp.values[0].len() } else { 0 }; + let n = if n > 5 { 5 } else { n }; + WaveEvents { + tss: inp.tss, + vals: inp.values.iter().map(|k| k[..n].to_vec()).collect(), + } + } else { + WaveEvents { + tss: inp.tss, + vals: inp.values, + } + } + } +} diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 365b6a4..25fe6de 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -40,6 +40,7 @@ use tokio::fs::File; use tokio::io::{AsyncRead, ReadBuf}; pub mod binnedfrompbv; +pub mod dim1; pub mod pbv; pub mod prebinned; pub mod query; @@ -272,7 +273,18 @@ where )); return Err(err); } - let channel_config = read_local_config(&query.channel(), &node_config.node).await?; + let channel_config = match read_local_config(&query.channel(), &node_config.node).await { + Ok(k) => k, + Err(e) => { + if e.msg().contains("ErrorKind::NotFound") { + let s = futures_util::stream::empty(); + let ret = BinnedResponseDyn { stream: Box::pin(s) }; + return Ok(ret); + } else { + return Err(e); + } + } + }; match extract_matching_config_entry(query.range(), &channel_config)? { MatchingConfigEntry::Multiple => Err(Error::with_msg("multiple config entries found"))?, MatchingConfigEntry::None => { diff --git a/disk/src/binned/dim1.rs b/disk/src/binned/dim1.rs new file mode 100644 index 0000000..fc70776 --- /dev/null +++ b/disk/src/binned/dim1.rs @@ -0,0 +1,497 @@ +use crate::agg::binnedt::{TimeBinnableType, TimeBinnableTypeAggregator}; +use crate::agg::enp::WaveEvents; +use crate::agg::streams::{Appendable, Collectable, Collector, ToJsonBytes, ToJsonResult}; +use crate::agg::{Fits, FitsInside}; +use crate::binned::{ + Bool, FilterFittingInside, IsoDateTime, NumOps, RangeOverlapInfo, ReadPbv, ReadableFromFile, TimeBins, WithLen, +}; +use crate::Sitemty; +use chrono::{TimeZone, Utc}; +use err::Error; +use netpod::timeunits::SEC; +use netpod::NanoRange; +use num_traits::Zero; +use serde::{Deserialize, Serialize}; +use std::fmt; +use std::marker::PhantomData; +use tokio::fs::File; + +#[derive(Clone, Serialize, Deserialize)] +pub struct MinMaxAvgDim1Bins { + pub ts1s: Vec, + pub ts2s: Vec, + pub counts: Vec, + pub mins: Vec>>, + pub maxs: Vec>>, + pub avgs: Vec>>, +} + +impl fmt::Debug for MinMaxAvgDim1Bins +where + NTY: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!( + fmt, + "MinMaxAvgDim1Bins count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}", + self.ts1s.len(), + self.ts1s.iter().map(|k| k / SEC).collect::>(), + self.ts2s.iter().map(|k| k / SEC).collect::>(), + self.counts, + self.mins.first(), + self.maxs.first(), + self.avgs.first(), + ) + } +} + +impl MinMaxAvgDim1Bins { + pub fn empty() -> Self { + Self { + ts1s: vec![], + ts2s: vec![], + counts: vec![], + mins: vec![], + maxs: vec![], + avgs: vec![], + } + } +} + +impl FitsInside for MinMaxAvgDim1Bins { + fn fits_inside(&self, range: NanoRange) -> Fits { + if self.ts1s.is_empty() { + Fits::Empty + } else { + let t1 = *self.ts1s.first().unwrap(); + let t2 = *self.ts2s.last().unwrap(); + if t2 <= range.beg { + Fits::Lower + } else if t1 >= range.end { + Fits::Greater + } else if t1 < range.beg && t2 > range.end { + Fits::PartlyLowerAndGreater + } else if t1 < range.beg { + Fits::PartlyLower + } else if t2 > range.end { + Fits::PartlyGreater + } else { + Fits::Inside + } + } + } +} + +impl FilterFittingInside for MinMaxAvgDim1Bins { + fn filter_fitting_inside(self, fit_range: NanoRange) -> Option { + match self.fits_inside(fit_range) { + Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(self), + _ => None, + } + } +} + +impl RangeOverlapInfo for MinMaxAvgDim1Bins { + fn ends_before(&self, range: NanoRange) -> bool { + match self.ts2s.last() { + Some(&ts) => ts <= range.beg, + None => true, + } + } + + fn ends_after(&self, range: NanoRange) -> bool { + match self.ts2s.last() { + Some(&ts) => ts > range.end, + None => panic!(), + } + } + + fn starts_after(&self, range: NanoRange) -> bool { + match self.ts1s.first() { + Some(&ts) => ts >= range.end, + None => panic!(), + } + } +} + +impl TimeBins for MinMaxAvgDim1Bins +where + NTY: NumOps, +{ + fn ts1s(&self) -> &Vec { + &self.ts1s + } + + fn ts2s(&self) -> &Vec { + &self.ts2s + } +} + +impl WithLen for MinMaxAvgDim1Bins { + fn len(&self) -> usize { + self.ts1s.len() + } +} + +impl Appendable for MinMaxAvgDim1Bins +where + NTY: NumOps, +{ + fn empty() -> Self { + Self::empty() + } + + fn append(&mut self, src: &Self) { + self.ts1s.extend_from_slice(&src.ts1s); + self.ts2s.extend_from_slice(&src.ts2s); + self.counts.extend_from_slice(&src.counts); + self.mins.extend_from_slice(&src.mins); + self.maxs.extend_from_slice(&src.maxs); + self.avgs.extend_from_slice(&src.avgs); + } +} + +impl ReadableFromFile for MinMaxAvgDim1Bins +where + NTY: NumOps, +{ + // TODO this function is not needed in the trait: + fn read_from_file(file: File) -> Result, Error> { + Ok(ReadPbv::new(file)) + } + + fn from_buf(buf: &[u8]) -> Result { + let dec = serde_cbor::from_slice(&buf)?; + Ok(dec) + } +} + +impl TimeBinnableType for MinMaxAvgDim1Bins +where + NTY: NumOps, +{ + type Output = MinMaxAvgDim1Bins; + type Aggregator = MinMaxAvgDim1BinsAggregator; + + fn aggregator(range: NanoRange) -> Self::Aggregator { + Self::Aggregator::new(range) + } +} + +impl ToJsonResult for Sitemty> +where + NTY: NumOps, +{ + fn to_json_result(&self) -> Result, Error> { + Ok(Box::new(serde_json::Value::String(format!( + "MinMaxAvgDim1Bins/non-json-item" + )))) + } +} + +pub struct MinMaxAvgDim1BinsCollected { + _m1: PhantomData, +} + +impl MinMaxAvgDim1BinsCollected { + pub fn new() -> Self { + Self { _m1: PhantomData } + } +} + +#[derive(Serialize)] +pub struct MinMaxAvgDim1BinsCollectedResult { + ts_bin_edges: Vec, + counts: Vec, + mins: Vec>>, + maxs: Vec>>, + avgs: Vec>>, + #[serde(skip_serializing_if = "Bool::is_false", rename = "finalisedRange")] + finalised_range: bool, + #[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")] + missing_bins: u32, + #[serde(skip_serializing_if = "Option::is_none", rename = "continueAt")] + continue_at: Option, +} + +pub struct MinMaxAvgDim1BinsCollector { + bin_count_exp: u32, + timed_out: bool, + range_complete: bool, + vals: MinMaxAvgDim1Bins, + _m1: PhantomData, +} + +impl MinMaxAvgDim1BinsCollector { + pub fn new(bin_count_exp: u32) -> Self { + Self { + bin_count_exp, + timed_out: false, + range_complete: false, + vals: MinMaxAvgDim1Bins::::empty(), + _m1: PhantomData, + } + } +} + +impl WithLen for MinMaxAvgDim1BinsCollector +where + NTY: NumOps + Serialize, +{ + fn len(&self) -> usize { + self.vals.ts1s.len() + } +} + +impl Collector for MinMaxAvgDim1BinsCollector +where + NTY: NumOps + Serialize, +{ + type Input = MinMaxAvgDim1Bins; + type Output = MinMaxAvgDim1BinsCollectedResult; + + fn ingest(&mut self, src: &Self::Input) { + Appendable::append(&mut self.vals, src); + } + + fn set_range_complete(&mut self) { + self.range_complete = true; + } + + fn set_timed_out(&mut self) { + self.timed_out = true; + } + + fn result(self) -> Result { + let bin_count = self.vals.ts1s.len() as u32; + let mut tsa: Vec<_> = self + .vals + .ts1s + .iter() + .map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64))) + .collect(); + if let Some(&z) = self.vals.ts2s.last() { + tsa.push(IsoDateTime(Utc.timestamp_nanos(z as i64))); + } + let tsa = tsa; + let continue_at = if self.vals.ts1s.len() < self.bin_count_exp as usize { + match tsa.last() { + Some(k) => Some(k.clone()), + None => Err(Error::with_msg("partial_content but no bin in result"))?, + } + } else { + None + }; + let ret = MinMaxAvgDim1BinsCollectedResult:: { + ts_bin_edges: tsa, + counts: self.vals.counts, + mins: self.vals.mins, + maxs: self.vals.maxs, + avgs: self.vals.avgs, + finalised_range: self.range_complete, + missing_bins: self.bin_count_exp - bin_count, + continue_at, + }; + Ok(ret) + } +} + +impl Collectable for MinMaxAvgDim1Bins +where + NTY: NumOps + Serialize, +{ + type Collector = MinMaxAvgDim1BinsCollector; + + fn new_collector(bin_count_exp: u32) -> Self::Collector { + Self::Collector::new(bin_count_exp) + } +} + +pub struct MinMaxAvgDim1BinsAggregator { + range: NanoRange, + count: u64, + min: Option>, + max: Option>, + sumc: u64, + sum: Option>, +} + +impl MinMaxAvgDim1BinsAggregator { + pub fn new(range: NanoRange) -> Self { + Self { + range, + count: 0, + min: None, + max: None, + sumc: 0, + sum: None, + } + } +} + +impl TimeBinnableTypeAggregator for MinMaxAvgDim1BinsAggregator +where + NTY: NumOps, +{ + type Input = MinMaxAvgDim1Bins; + type Output = MinMaxAvgDim1Bins; + + fn range(&self) -> &NanoRange { + &self.range + } + + fn ingest(&mut self, item: &Self::Input) { + for i1 in 0..item.ts1s.len() { + if item.ts2s[i1] <= self.range.beg { + continue; + } else if item.ts1s[i1] >= self.range.end { + continue; + } else { + match self.min.as_mut() { + None => self.min = item.mins[i1].clone(), + Some(min) => match item.mins[i1].as_ref() { + None => {} + Some(v) => { + for (a, b) in min.iter_mut().zip(v.iter()) { + if *b < *a { + *a = *b; + } + } + } + }, + }; + match self.max.as_mut() { + None => self.max = item.maxs[i1].clone(), + Some(max) => match item.maxs[i1].as_ref() { + None => {} + Some(v) => { + for (a, b) in max.iter_mut().zip(v.iter()) { + if *b > *a { + *a = *b; + } + } + } + }, + }; + match self.sum.as_mut() { + None => { + self.sum = item.avgs[i1].clone(); + } + Some(sum) => match item.avgs[i1].as_ref() { + None => {} + Some(v) => { + for (a, b) in sum.iter_mut().zip(v.iter()) { + if (*b).is_nan() { + } else { + *a += *b; + } + } + self.sumc += 1; + } + }, + } + self.count += item.counts[i1]; + } + } + } + + fn result(self) -> Self::Output { + let avg = if self.sumc == 0 { + None + } else { + let avg = self + .sum + .as_ref() + .unwrap() + .iter() + .map(|k| k / self.sumc as f32) + .collect(); + Some(avg) + }; + Self::Output { + ts1s: vec![self.range.beg], + ts2s: vec![self.range.end], + counts: vec![self.count], + mins: vec![self.min], + maxs: vec![self.max], + avgs: vec![avg], + } + } +} + +#[derive(Serialize)] +pub struct WaveEventsCollectedResult { + ts0: u64, + tsoff: Vec, + values: Vec>, + #[serde(skip_serializing_if = "Bool::is_false", rename = "finalisedRange")] + range_complete: bool, + #[serde(skip_serializing_if = "Bool::is_false", rename = "timedOut")] + timed_out: bool, +} + +pub struct WaveEventsCollector { + vals: WaveEvents, + range_complete: bool, + timed_out: bool, +} + +impl WaveEventsCollector { + pub fn new(_bin_count_exp: u32) -> Self { + Self { + vals: WaveEvents::empty(), + range_complete: false, + timed_out: false, + } + } +} + +impl WithLen for WaveEventsCollector { + fn len(&self) -> usize { + self.vals.tss.len() + } +} + +impl Collector for WaveEventsCollector +where + NTY: NumOps, +{ + type Input = WaveEvents; + type Output = WaveEventsCollectedResult; + + fn ingest(&mut self, src: &Self::Input) { + self.vals.append(src); + } + + fn set_range_complete(&mut self) { + self.range_complete = true; + } + + fn set_timed_out(&mut self) { + self.timed_out = true; + } + + fn result(self) -> Result { + let ts0 = self.vals.tss.first().map_or(0, |k| *k / SEC); + let tsoff = self.vals.tss.into_iter().map(|k| k - ts0 * SEC).collect(); + let ret = Self::Output { + ts0, + tsoff, + values: self.vals.vals, + range_complete: self.range_complete, + timed_out: self.timed_out, + }; + Ok(ret) + } +} + +impl Collectable for WaveEvents +where + NTY: NumOps, +{ + type Collector = WaveEventsCollector; + + fn new_collector(bin_count_exp: u32) -> Self::Collector { + Self::Collector::new(bin_count_exp) + } +} diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index 19c619b..02975b1 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -115,7 +115,18 @@ pub async fn pre_binned_bytes_for_http( )); return Err(err); } - let channel_config = read_local_config(&query.channel(), &node_config.node).await?; + let channel_config = match read_local_config(&query.channel(), &node_config.node).await { + Ok(k) => k, + Err(e) => { + if e.msg().contains("ErrorKind::NotFound") { + let s = futures_util::stream::empty(); + let ret = Box::pin(s); + return Ok(ret); + } else { + return Err(e); + } + } + }; let entry_res = extract_matching_config_entry(&query.patch().patch_range(), &channel_config)?; let entry = match entry_res { MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found")), diff --git a/disk/src/binned/query.rs b/disk/src/binned/query.rs index 42a80f6..dcf6704 100644 --- a/disk/src/binned/query.rs +++ b/disk/src/binned/query.rs @@ -1,3 +1,4 @@ +use crate::query::channel_from_params; use chrono::{DateTime, TimeZone, Utc}; use err::Error; use netpod::log::*; @@ -211,7 +212,7 @@ impl BinnedQuery { .map_err(|e| Error::with_msg(format!("can not parse binCount {:?}", e)))?, agg_kind: params .get("aggKind") - .map_or("DimXBins1", |k| k) + .map_or(&format!("{}", AggKind::DimXBins1), |k| k) .parse() .map_err(|e| Error::with_msg(format!("can not parse aggKind {:?}", e)))?, channel: channel_from_params(¶ms)?, @@ -306,96 +307,3 @@ impl BinnedQuery { ) } } - -fn channel_from_params(params: &BTreeMap) -> Result { - let ret = Channel { - backend: params - .get("channelBackend") - .ok_or(Error::with_msg("missing channelBackend"))? - .into(), - name: params - .get("channelName") - .ok_or(Error::with_msg("missing channelName"))? - .into(), - }; - Ok(ret) -} - -// TODO move this query type out of this `binned` mod -#[derive(Clone, Debug)] -pub struct PlainEventsQuery { - channel: Channel, - range: NanoRange, - report_error: bool, - timeout: Duration, -} - -impl PlainEventsQuery { - pub fn new(channel: Channel, range: NanoRange) -> Self { - Self { - channel, - range, - report_error: false, - timeout: Duration::from_millis(2000), - } - } - - pub fn from_request(req: &http::request::Parts) -> Result { - let params = netpod::query_params(req.uri.query()); - let beg_date = params.get("begDate").ok_or(Error::with_msg("missing begDate"))?; - let end_date = params.get("endDate").ok_or(Error::with_msg("missing endDate"))?; - let ret = Self { - range: NanoRange { - beg: beg_date.parse::>()?.to_nanos(), - end: end_date.parse::>()?.to_nanos(), - }, - channel: channel_from_params(¶ms)?, - report_error: params - .get("reportError") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, - timeout: params - .get("timeout") - .map_or("2000", |k| k) - .parse::() - .map(|k| Duration::from_millis(k)) - .map_err(|e| Error::with_msg(format!("can not parse timeout {:?}", e)))?, - }; - Ok(ret) - } - - pub fn range(&self) -> &NanoRange { - &self.range - } - - pub fn channel(&self) -> &Channel { - &self.channel - } - - pub fn report_error(&self) -> bool { - self.report_error - } - - pub fn timeout(&self) -> Duration { - self.timeout - } - - pub fn set_timeout(&mut self, k: Duration) { - self.timeout = k; - } - - pub fn url(&self, host: &HostPort) -> String { - let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; - format!( - "http://{}:{}/api/4/plain_events_json?channelBackend={}&channelName={}&begDate={}&endDate={}&timeout={}", - host.host, - host.port, - self.channel.backend, - self.channel.name, - Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt), - Utc.timestamp_nanos(self.range.end as i64).format(date_fmt), - self.timeout.as_millis(), - ) - } -} diff --git a/disk/src/channelconfig.rs b/disk/src/channelconfig.rs deleted file mode 100644 index 8b13789..0000000 --- a/disk/src/channelconfig.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index cf47369..4a770c8 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -1,11 +1,11 @@ -use crate::agg::enp::Identity; +use crate::agg::enp::{Identity, WavePlainProc}; use crate::agg::streams::{Collectable, Collector, StreamItem}; -use crate::binned::{NumOps, RangeCompletableItem}; +use crate::binned::{EventsNodeProcessor, NumOps, PushableIndex, RangeCompletableItem}; use crate::decode::{ BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValues, EventValuesDim0Case, EventValuesDim1Case, LittleEndian, NumFromBytes, }; -use crate::frame::makeframe::Framable; +use crate::frame::makeframe::{Framable, FrameType}; use crate::merge::mergedfromremotes::MergedFromRemotes; use crate::raw::EventsQuery; use crate::Sitemty; @@ -19,6 +19,7 @@ use parse::channelconfig::{extract_matching_config_entry, read_local_config, Mat use serde_json::Value as JsonValue; use std::pin::Pin; use std::time::Duration; +use tokio::time::timeout_at; pub trait ChannelExecFunction { type Output; @@ -27,8 +28,12 @@ pub trait ChannelExecFunction { where NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, - EVS: EventValueShape + EventValueFromBytes + 'static, - EventValues: Collectable; + EVS: EventValueShape + EventValueFromBytes + PlainEventsAggMethod + 'static, + EventValues: Collectable, + Sitemty<<::Method as EventsNodeProcessor>::Output>: FrameType, + <::Method as EventsNodeProcessor>::Output: Collectable + PushableIndex; + + fn empty() -> Self::Output; } fn channel_exec_nty_end_evs_enp( @@ -40,8 +45,10 @@ where F: ChannelExecFunction, NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, - EVS: EventValueShape + EventValueFromBytes + 'static, + EVS: EventValueShape + EventValueFromBytes + PlainEventsAggMethod + 'static, EventValues: Collectable, + Sitemty<<::Method as EventsNodeProcessor>::Output>: FrameType, + <::Method as EventsNodeProcessor>::Output: Collectable + PushableIndex, { Ok(f.exec::(byte_order, event_value_shape)?) } @@ -101,7 +108,16 @@ pub async fn channel_exec( where F: ChannelExecFunction, { - let channel_config = read_local_config(channel, &node_config.node).await?; + let channel_config = match read_local_config(channel, &node_config.node).await { + Ok(k) => k, + Err(e) => { + if e.msg().contains("ErrorKind::NotFound") { + return Ok(F::empty()); + } else { + return Err(e); + } + } + }; match extract_matching_config_entry(range, &channel_config)? { MatchingConfigEntry::Multiple => Err(Error::with_msg("multiple config entries found"))?, MatchingConfigEntry::None => { @@ -133,7 +149,7 @@ impl PlainEvents { Self { channel, range, - agg_kind: AggKind::DimXBins1, + agg_kind: AggKind::Plain, node_config, } } @@ -169,21 +185,27 @@ impl ChannelExecFunction for PlainEvents { let s = s.map(|item| Box::new(item) as Box); Ok(Box::pin(s)) } + + fn empty() -> Self::Output { + Box::pin(futures_util::stream::empty()) + } } pub struct PlainEventsJson { channel: Channel, range: NanoRange, agg_kind: AggKind, + timeout: Duration, node_config: NodeConfigCached, } impl PlainEventsJson { - pub fn new(channel: Channel, range: NanoRange, node_config: NodeConfigCached) -> Self { + pub fn new(channel: Channel, range: NanoRange, timeout: Duration, node_config: NodeConfigCached) -> Self { Self { channel, range, - agg_kind: AggKind::DimXBins1, + agg_kind: AggKind::Plain, + timeout, node_config, } } @@ -216,7 +238,7 @@ where if false { None } else { - match tokio::time::timeout_at(deadline, stream.next()).await { + match timeout_at(deadline, stream.next()).await { Ok(k) => k, Err(_) => { collector.set_timed_out(); @@ -254,6 +276,24 @@ where Ok(ret) } +pub trait PlainEventsAggMethod { + type Method: EventsNodeProcessor; +} + +impl PlainEventsAggMethod for EventValuesDim0Case +where + NTY: NumOps, +{ + type Method = Identity; +} + +impl PlainEventsAggMethod for EventValuesDim1Case +where + NTY: NumOps, +{ + type Method = WavePlainProc; +} + impl ChannelExecFunction for PlainEventsJson { type Output = Pin> + Send>>; @@ -261,9 +301,10 @@ impl ChannelExecFunction for PlainEventsJson { where NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, - EVS: EventValueShape + EventValueFromBytes + 'static, - EventValues: Collectable, + EVS: EventValueShape + EventValueFromBytes + PlainEventsAggMethod + 'static, EventValues: Collectable, + Sitemty<<::Method as EventsNodeProcessor>::Output>: FrameType, + <::Method as EventsNodeProcessor>::Output: Collectable + PushableIndex, { let _ = byte_order; let _ = event_value_shape; @@ -273,10 +314,12 @@ impl ChannelExecFunction for PlainEventsJson { range: self.range, agg_kind: self.agg_kind, }; - let s = MergedFromRemotes::>::new(evq, perf_opts, self.node_config.node_config.cluster); - // TODO take time out from query parameter. - let f = collect_plain_events_json(s, Duration::from_millis(2000)); - //let s = s.map(|item| Box::new(item) as Box); + let s = MergedFromRemotes::<::Method>::new( + evq, + perf_opts, + self.node_config.node_config.cluster, + ); + let f = collect_plain_events_json(s, self.timeout); let f = FutureExt::map(f, |item| match item { Ok(item) => Ok(Bytes::from(serde_json::to_vec(&item)?)), Err(e) => Err(e.into()), @@ -284,4 +327,8 @@ impl ChannelExecFunction for PlainEventsJson { let s = futures_util::stream::once(f); Ok(Box::pin(s)) } + + fn empty() -> Self::Output { + Box::pin(futures_util::stream::empty()) + } } diff --git a/disk/src/decode.rs b/disk/src/decode.rs index b9662e4..16bcd69 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -1,5 +1,5 @@ use crate::agg::binnedt::TimeBinnableType; -use crate::agg::enp::{Identity, WaveXBinner}; +use crate::agg::enp::{Identity, WaveNBinner, WavePlainProc, WaveXBinner}; use crate::agg::streams::{Appendable, Collectable, Collector, StreamItem}; use crate::agg::{Fits, FitsInside}; use crate::binned::{ @@ -11,6 +11,7 @@ use crate::eventchunker::EventFull; use err::Error; use futures_core::Stream; use futures_util::StreamExt; +use netpod::timeunits::SEC; use netpod::NanoRange; use serde::{Deserialize, Serialize}; use std::marker::PhantomData; @@ -104,6 +105,7 @@ where { type NumXAggToSingleBin: EventsNodeProcessor>::Output>; type NumXAggToNBins: EventsNodeProcessor>::Output>; + type NumXAggPlain: EventsNodeProcessor>::Output>; } pub struct EventValuesDim0Case { @@ -123,6 +125,7 @@ where type NumXAggToSingleBin = Identity; // TODO is this sufficient? type NumXAggToNBins = Identity; + type NumXAggPlain = Identity; } pub struct EventValuesDim1Case { @@ -141,8 +144,8 @@ where NTY: NumOps + NumFromBytes, { type NumXAggToSingleBin = WaveXBinner; - // TODO implement this method: - type NumXAggToNBins = WaveXBinner; + type NumXAggToNBins = WaveNBinner; + type NumXAggPlain = WavePlainProc; } // TODO add pulse. @@ -350,8 +353,8 @@ where } fn result(self) -> Result { - let ts0 = self.vals.tss.first().map_or(0, |k| *k); - let tsoff = self.vals.tss.into_iter().map(|k| k - ts0).collect(); + let ts0 = self.vals.tss.first().map_or(0, |k| *k / SEC); + let tsoff = self.vals.tss.into_iter().map(|k| k - ts0 * SEC).collect(); let ret = Self::Output { ts0, tsoff, diff --git a/disk/src/events.rs b/disk/src/events.rs new file mode 100644 index 0000000..99fbbdc --- /dev/null +++ b/disk/src/events.rs @@ -0,0 +1,163 @@ +use crate::query::channel_from_params; +use chrono::{DateTime, TimeZone, Utc}; +use err::Error; +use netpod::{Channel, HostPort, NanoRange, ToNanos}; +use std::time::Duration; + +// TODO move this query type out of this `binned` mod +#[derive(Clone, Debug)] +pub struct PlainEventsQuery { + channel: Channel, + range: NanoRange, + report_error: bool, + timeout: Duration, +} + +impl PlainEventsQuery { + pub fn new(channel: Channel, range: NanoRange) -> Self { + Self { + channel, + range, + report_error: false, + timeout: Duration::from_millis(2000), + } + } + + pub fn from_request(req: &http::request::Parts) -> Result { + let params = netpod::query_params(req.uri.query()); + let beg_date = params.get("begDate").ok_or(Error::with_msg("missing begDate"))?; + let end_date = params.get("endDate").ok_or(Error::with_msg("missing endDate"))?; + let ret = Self { + range: NanoRange { + beg: beg_date.parse::>()?.to_nanos(), + end: end_date.parse::>()?.to_nanos(), + }, + channel: channel_from_params(¶ms)?, + report_error: params + .get("reportError") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, + timeout: params + .get("timeout") + .map_or("2000", |k| k) + .parse::() + .map(|k| Duration::from_millis(k)) + .map_err(|e| Error::with_msg(format!("can not parse timeout {:?}", e)))?, + }; + Ok(ret) + } + + pub fn range(&self) -> &NanoRange { + &self.range + } + + pub fn channel(&self) -> &Channel { + &self.channel + } + + pub fn report_error(&self) -> bool { + self.report_error + } + + pub fn timeout(&self) -> Duration { + self.timeout + } + + pub fn set_timeout(&mut self, k: Duration) { + self.timeout = k; + } + + pub fn url(&self, host: &HostPort) -> String { + let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; + format!( + "http://{}:{}/api/4/plain_events?channelBackend={}&channelName={}&begDate={}&endDate={}&timeout={}", + host.host, + host.port, + self.channel.backend, + self.channel.name, + Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt), + Utc.timestamp_nanos(self.range.end as i64).format(date_fmt), + self.timeout.as_millis(), + ) + } +} + +// TODO move this query type out of this `binned` mod +#[derive(Clone, Debug)] +pub struct PlainEventsJsonQuery { + channel: Channel, + range: NanoRange, + report_error: bool, + timeout: Duration, +} + +impl PlainEventsJsonQuery { + pub fn new(channel: Channel, range: NanoRange) -> Self { + Self { + channel, + range, + report_error: false, + timeout: Duration::from_millis(2000), + } + } + + pub fn from_request(req: &http::request::Parts) -> Result { + let params = netpod::query_params(req.uri.query()); + let beg_date = params.get("begDate").ok_or(Error::with_msg("missing begDate"))?; + let end_date = params.get("endDate").ok_or(Error::with_msg("missing endDate"))?; + let ret = Self { + range: NanoRange { + beg: beg_date.parse::>()?.to_nanos(), + end: end_date.parse::>()?.to_nanos(), + }, + channel: channel_from_params(¶ms)?, + report_error: params + .get("reportError") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, + timeout: params + .get("timeout") + .map_or("2000", |k| k) + .parse::() + .map(|k| Duration::from_millis(k)) + .map_err(|e| Error::with_msg(format!("can not parse timeout {:?}", e)))?, + }; + Ok(ret) + } + + pub fn range(&self) -> &NanoRange { + &self.range + } + + pub fn channel(&self) -> &Channel { + &self.channel + } + + pub fn report_error(&self) -> bool { + self.report_error + } + + pub fn timeout(&self) -> Duration { + self.timeout + } + + pub fn set_timeout(&mut self, k: Duration) { + self.timeout = k; + } + + pub fn url(&self, host: &HostPort) -> String { + let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; + format!( + "http://{}:{}/api/4/alpha_plain_events_json?channelBackend={}&channelName={}&begDate={}&endDate={}&timeout={}", + host.host, + host.port, + self.channel.backend, + self.channel.name, + Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt), + Utc.timestamp_nanos(self.range.end as i64).format(date_fmt), + self.timeout.as_millis(), + ) + } +} diff --git a/disk/src/frame/makeframe.rs b/disk/src/frame/makeframe.rs index 83a98fc..edac70f 100644 --- a/disk/src/frame/makeframe.rs +++ b/disk/src/frame/makeframe.rs @@ -1,4 +1,4 @@ -use crate::agg::enp::XBinnedScalarEvents; +use crate::agg::enp::{WaveEvents, XBinnedScalarEvents}; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; @@ -97,6 +97,13 @@ where const FRAME_TYPE_ID: u32 = 0x700 + NTY::SUB; } +impl FrameType for Sitemty> +where + NTY: SubFrId, +{ + const FRAME_TYPE_ID: u32 = 0x800 + NTY::SUB; +} + pub trait ProvidesFrameType { fn frame_type_id(&self) -> u32; } @@ -144,6 +151,15 @@ where } } +impl Framable for Sitemty> +where + NTY: NumOps + Serialize, +{ + fn make_frame(&self) -> Result { + make_frame(self) + } +} + pub fn make_frame(item: &FT) -> Result where FT: FrameType + Serialize, diff --git a/disk/src/lib.rs b/disk/src/lib.rs index b577827..1836c6b 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -23,17 +23,18 @@ pub mod aggtest; pub mod binned; pub mod binnedstream; pub mod cache; -pub mod channelconfig; pub mod channelexec; pub mod dataopen; pub mod decode; pub mod eventblobs; pub mod eventchunker; +pub mod events; pub mod frame; pub mod gen; pub mod index; pub mod merge; pub mod paths; +pub mod query; pub mod raw; pub mod streamlog; diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 16c4f95..7bbd369 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -19,8 +19,7 @@ enum MergedCurVal { Val(T), } -// TODO rename after refactor -pub struct MergedStream2 +pub struct MergedStream where S: Stream::Output>>, ENP: EventsNodeProcessor, @@ -41,7 +40,7 @@ where event_data_read_stats_items: VecDeque, } -impl MergedStream2 +impl MergedStream where S: Stream::Output>> + Unpin, ENP: EventsNodeProcessor, @@ -133,7 +132,7 @@ where } } -impl Stream for MergedStream2 +impl Stream for MergedStream where S: Stream::Output>> + Unpin, ENP: EventsNodeProcessor, diff --git a/disk/src/merge/mergedfromremotes.rs b/disk/src/merge/mergedfromremotes.rs index cf3672b..b7b8832 100644 --- a/disk/src/merge/mergedfromremotes.rs +++ b/disk/src/merge/mergedfromremotes.rs @@ -1,8 +1,8 @@ use crate::agg::streams::Appendable; use crate::binned::{EventsNodeProcessor, PushableIndex}; use crate::frame::makeframe::FrameType; -use crate::merge::MergedStream2; -use crate::raw::{x_processed_stream_from_node2, EventsQuery}; +use crate::merge::MergedStream; +use crate::raw::{x_processed_stream_from_node, EventsQuery}; use crate::Sitemty; use err::Error; use futures_core::Stream; @@ -36,7 +36,7 @@ where pub fn new(evq: EventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self { let mut tcp_establish_futs = vec![]; for node in &cluster.nodes { - let f = x_processed_stream_from_node2::(evq.clone(), perf_opts.clone(), node.clone()); + let f = x_processed_stream_from_node::(evq.clone(), perf_opts.clone(), node.clone()); let f: T002<::Output> = Box::pin(f); tcp_establish_futs.push(f); } @@ -107,7 +107,7 @@ where } else { if c1 == self.tcp_establish_futs.len() { let inps: Vec<_> = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect(); - let s1 = MergedStream2::<_, ENP>::new(inps); + let s1 = MergedStream::<_, ENP>::new(inps); self.merged = Some(Box::pin(s1)); } continue 'outer; diff --git a/disk/src/query.rs b/disk/src/query.rs new file mode 100644 index 0000000..45a4eec --- /dev/null +++ b/disk/src/query.rs @@ -0,0 +1,17 @@ +use err::Error; +use netpod::Channel; +use std::collections::BTreeMap; + +pub fn channel_from_params(params: &BTreeMap) -> Result { + let ret = Channel { + backend: params + .get("channelBackend") + .ok_or(Error::with_msg("missing channelBackend"))? + .into(), + name: params + .get("channelName") + .ok_or(Error::with_msg("missing channelName"))? + .into(), + }; + Ok(ret) +} diff --git a/disk/src/raw.rs b/disk/src/raw.rs index a961577..7ad613b 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -18,8 +18,6 @@ use serde::{Deserialize, Serialize}; use std::pin::Pin; use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; -#[allow(unused_imports)] -use tracing::{debug, error, info, span, trace, warn, Level}; pub mod conn; pub mod eventsfromframes; @@ -37,7 +35,7 @@ pub struct EventsQuery { #[derive(Serialize, Deserialize)] pub struct EventQueryJsonStringFrame(String); -pub async fn x_processed_stream_from_node2( +pub async fn x_processed_stream_from_node( query: EventsQuery, perf_opts: PerfOpts, node: Node, diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index ae5d9f4..66d7a5e 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -61,18 +61,21 @@ async fn events_conn_handler_inner( match events_conn_handler_inner_try(stream, addr, node_config).await { Ok(_) => (), Err(mut ce) => { - // TODO is it guaranteed to be compatible to serialize this way? - let buf = - make_frame::>, Error>>(&Err(ce.err))?; - match ce.netout.write_all(&buf).await { - Ok(_) => (), - Err(e) => match e.kind() { - io::ErrorKind::BrokenPipe => {} - _ => { - error!("events_conn_handler_inner sees: {:?}", e); - return Err(e)?; - } - }, + error!("events_conn_handler_inner: {:?}", ce.err); + if false { + let buf = make_frame::>, Error>>( + &Err(ce.err), + )?; + match ce.netout.write_all(&buf).await { + Ok(_) => (), + Err(e) => match e.kind() { + io::ErrorKind::BrokenPipe => {} + _ => { + error!("events_conn_handler_inner sees: {:?}", e); + return Err(e)?; + } + }, + } } } } @@ -143,7 +146,15 @@ macro_rules! pipe4 { $nty, $end, $evs<$nty>, - <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToSingleBin, + // TODO must pass on the requested number of bins: + <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToNBins, + //WaveXBinner<$nty>, + >($evsv, $event_blobs), + AggKind::Plain => make_num_pipeline_stream_evs::< + $nty, + $end, + $evs<$nty>, + <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggPlain, //WaveXBinner<$nty>, >($evsv, $event_blobs), } @@ -254,7 +265,13 @@ async fn events_conn_handler_inner_try( let range = &evq.range; let channel_config = match read_local_config(&evq.channel, &node_config.node).await { Ok(k) => k, - Err(e) => return Err((e, netout))?, + Err(e) => { + if e.msg().contains("ErrorKind::NotFound") { + return Ok(()); + } else { + return Err((e, netout))?; + } + } }; let entry_res = match extract_matching_config_entry(range, &channel_config) { Ok(k) => k, diff --git a/err/src/lib.rs b/err/src/lib.rs index 3406916..d04f069 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -31,6 +31,10 @@ impl Error { trace_str: Some(fmt_backtrace(&backtrace::Backtrace::new())), } } + + pub fn msg(&self) -> &str { + &self.msg + } } fn fmt_backtrace(trace: &backtrace::Backtrace) -> String { diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index d026c4a..4d3eb4f 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -1,7 +1,8 @@ use crate::gather::gather_get_json; use bytes::Bytes; use disk::binned::prebinned::pre_binned_bytes_for_http; -use disk::binned::query::{BinnedQuery, PlainEventsQuery, PreBinnedQuery}; +use disk::binned::query::{BinnedQuery, PreBinnedQuery}; +use disk::events::PlainEventsQuery; use disk::raw::conn::events_service; use err::Error; use future::Future; @@ -175,7 +176,7 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } - } else if path == "/api/4/plain_events_json" { + } else if path == "/api/4/alpha_plain_events_json" { if req.method() == Method::GET { Ok(plain_events_json(req, &node_config).await?) } else { @@ -411,8 +412,12 @@ async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Res async fn plain_events_json(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let (head, _body) = req.into_parts(); let query = PlainEventsQuery::from_request(&head)?; - let op = - disk::channelexec::PlainEventsJson::new(query.channel().clone(), query.range().clone(), node_config.clone()); + let op = disk::channelexec::PlainEventsJson::new( + query.channel().clone(), + query.range().clone(), + query.timeout(), + node_config.clone(), + ); let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), node_config).await?; let ret = response(StatusCode::OK).body(BodyStream::wrapped(s, format!("plain_events")))?; Ok(ret) diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 4811fe9..82234a7 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -533,18 +533,12 @@ impl PreBinnedPatchCoord { pub struct PreBinnedPatchIterator { range: PreBinnedPatchRange, - #[allow(dead_code)] - agg_kind: AggKind, ix: u64, } impl PreBinnedPatchIterator { pub fn from_range(range: PreBinnedPatchRange) -> Self { - Self { - range, - agg_kind: AggKind::DimXBins1, - ix: 0, - } + Self { range, ix: 0 } } } @@ -664,6 +658,7 @@ impl BinnedRange { pub enum AggKind { DimXBins1, DimXBinsN(u32), + Plain, } impl Display for AggKind { @@ -675,6 +670,9 @@ impl Display for AggKind { Self::DimXBinsN(n) => { write!(fmt, "DimXBinsN{}", n) } + Self::Plain => { + write!(fmt, "Plain") + } } } } diff --git a/parse/src/channelconfig.rs b/parse/src/channelconfig.rs index 9001f32..2127981 100644 --- a/parse/src/channelconfig.rs +++ b/parse/src/channelconfig.rs @@ -10,6 +10,7 @@ use nom::Needed; use num_derive::{FromPrimitive, ToPrimitive}; use num_traits::ToPrimitive; use serde::{Deserialize, Serialize}; +use tokio::io::ErrorKind; type NRes<'a, O> = nom::IResult<&'a [u8], O, err::Error>; @@ -259,7 +260,13 @@ pub async fn read_local_config(channel: &Channel, node: &Node) -> Result k, + Err(e) => match e.kind() { + ErrorKind::NotFound => return Err(Error::with_msg("ErrorKind::NotFound")), + _ => return Err(e.into()), + }, + }; let config = parse_config(&buf)?; Ok(config.1) }