From ed723b634bf632716a3ac43aa9a9c0a982d86520 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 5 Dec 2022 17:51:23 +0100 Subject: [PATCH] Remove ChannelExecFunction --- disk/src/binned.rs | 414 ------------------ disk/src/channelexec.rs | 363 --------------- disk/src/disk.rs | 2 - httpret/Cargo.toml | 3 +- httpret/src/api1.rs | 3 +- httpret/src/bodystream.rs | 3 +- httpret/src/channelconfig.rs | 2 +- httpret/src/events.rs | 60 +-- httpret/src/evinfo.rs | 169 ------- httpret/src/httpret.rs | 120 +---- httpret/src/proxy.rs | 3 +- httpret/src/proxy/api4.rs | 2 +- netpod/src/query.rs | 4 +- .../query.rs => netpod/src/query/prebinned.rs | 15 +- 14 files changed, 39 insertions(+), 1124 deletions(-) delete mode 100644 disk/src/binned.rs delete mode 100644 disk/src/channelexec.rs rename disk/src/binned/query.rs => netpod/src/query/prebinned.rs (91%) diff --git a/disk/src/binned.rs b/disk/src/binned.rs deleted file mode 100644 index c9693e2..0000000 --- a/disk/src/binned.rs +++ /dev/null @@ -1,414 +0,0 @@ -pub mod binnedfrompbv; -pub mod dim1; -pub mod pbv; -pub mod prebinned; -pub mod query; - -use crate::agg::binnedt::TBinnerStream; -use crate::binned::binnedfrompbv::BinnedFromPreBinned; -use crate::binnedstream::BoxedStream; -use crate::channelexec::{channel_exec, ChannelExecFunction}; -use crate::decode::{Endianness, EventValueFromBytes, EventValueShape, NumFromBytes}; -use crate::merge::mergedfromremotes::MergedFromRemotes; -use bytes::Bytes; -use err::Error; -use futures_core::Stream; -use futures_util::StreamExt; -use items::numops::NumOps; -use items::streams::{collect_plain_events_json, Collectable, Collector}; -use items::{ - Clearable, EventsNodeProcessor, FilterFittingInside, Framable, FrameDecodable, FrameType, PushableIndex, - RangeCompletableItem, Sitemty, StreamItem, TimeBinnableType, WithLen, -}; -use netpod::log::*; -use netpod::query::{BinnedQuery, RawEventsQuery}; -use netpod::x_bin_count; -use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange, ScalarType, Shape}; -use std::fmt::Debug; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; - -pub struct BinnedStreamRes { - pub binned_stream: BoxedStream>, Error>>, - pub range: BinnedRange, -} - -pub struct BinnedBinaryChannelExec { - query: BinnedQuery, - node_config: NodeConfigCached, -} - -impl BinnedBinaryChannelExec { - pub fn new(query: BinnedQuery, node_config: NodeConfigCached) -> Self { - Self { query, node_config } - } -} - -impl ChannelExecFunction for BinnedBinaryChannelExec { - type Output = Pin> + Send>>; - - fn exec( - self, - _byte_order: END, - scalar_type: ScalarType, - shape: Shape, - event_value_shape: EVS, - _events_node_proc: ENP, - ) -> Result - where - NTY: NumOps + NumFromBytes + 'static, - END: Endianness + 'static, - EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Batch> + 'static, - // TODO require these things in general? - ::Output: Collectable + PushableIndex + Clearable, - <::Output as TimeBinnableType>::Output: Debug - + TimeBinnableType::Output as TimeBinnableType>::Output> - + Collectable - + Unpin, - Sitemty<::Output>: FrameType + Framable + 'static, - Sitemty<<::Output as TimeBinnableType>::Output>: - FrameType + Framable + FrameDecodable, - { - let _ = event_value_shape; - let range = BinnedRange::covering_range(self.query.range().clone(), self.query.bin_count())?; - let perf_opts = PerfOpts { inmem_bufcap: 512 }; - let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) { - Ok(Some(pre_range)) => { - debug!("BinnedBinaryChannelExec found pre_range: {pre_range:?}"); - if range.grid_spec().bin_t_len() < pre_range.grid_spec.bin_t_len() { - let msg = format!( - "BinnedBinaryChannelExec incompatible ranges:\npre_range: {pre_range:?}\nrange: {range:?}" - ); - return Err(Error::with_msg(msg)); - } - let s = BinnedFromPreBinned::<<::Output as TimeBinnableType>::Output>::new( - PreBinnedPatchIterator::from_range(pre_range), - self.query.channel().clone(), - range.clone(), - scalar_type, - shape, - self.query.agg_kind().clone(), - self.query.cache_usage().clone(), - self.query.disk_io_buffer_size(), - &self.node_config, - self.query.disk_stats_every().clone(), - self.query.report_error(), - )? - .map(|item| match item.make_frame() { - Ok(item) => Ok(item.freeze()), - Err(e) => Err(e), - }); - Ok(Box::pin(s) as Pin> + Send>>) - } - Ok(None) => { - debug!( - "BinnedBinaryChannelExec no covering range for prebinned, merge from remotes instead {range:?}" - ); - // TODO let BinnedQuery provide the DiskIoTune and pass to RawEventsQuery: - let evq = RawEventsQuery::new( - self.query.channel().clone(), - self.query.range().clone(), - self.query.agg_kind().clone(), - ); - let x_bin_count = x_bin_count(&shape, self.query.agg_kind()); - let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); - let s = TBinnerStream::<_, ::Output>::new( - s, - range, - x_bin_count, - self.query.agg_kind().do_time_weighted(), - ); - let s = s.map(|item| match item.make_frame() { - Ok(item) => Ok(item.freeze()), - Err(e) => Err(e), - }); - Ok(Box::pin(s) as Pin> + Send>>) - } - Err(e) => Err(e), - }?; - Ok(souter) - } - - fn empty() -> Self::Output { - Box::pin(futures_util::stream::empty()) - } -} - -pub async fn binned_bytes_for_http( - query: &BinnedQuery, - scalar_type: ScalarType, - shape: Shape, - node_config: &NodeConfigCached, -) -> Result> + Send>>, Error> { - let ret = channel_exec( - BinnedBinaryChannelExec::new(query.clone(), node_config.clone()), - query.channel(), - query.range(), - scalar_type, - shape, - query.agg_kind().clone(), - node_config, - ) - .await?; - Ok(Box::pin(ret)) -} - -pub struct BinnedBytesForHttpStream { - inp: S, - errored: bool, - completed: bool, -} - -impl BinnedBytesForHttpStream { - pub fn new(inp: S) -> Self { - Self { - inp, - errored: false, - completed: false, - } - } -} - -pub trait MakeBytesFrame { - fn make_bytes_frame(&self) -> Result { - // TODO only implemented for one type, remove - err::todoval() - } -} - -impl Stream for BinnedBytesForHttpStream -where - S: Stream + Unpin, - I: MakeBytesFrame, -{ - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - if self.completed { - panic!("BinnedBytesForHttpStream poll_next on completed"); - } - if self.errored { - self.completed = true; - return Ready(None); - } - match self.inp.poll_next_unpin(cx) { - Ready(Some(item)) => match item.make_bytes_frame() { - Ok(buf) => Ready(Some(Ok(buf))), - Err(e) => { - self.errored = true; - Ready(Some(Err(e.into()))) - } - }, - Ready(None) => { - self.completed = true; - Ready(None) - } - Pending => Pending, - } - } -} - -pub struct Bool {} - -impl Bool { - pub fn is_false(x: &bool) -> bool { - *x == false - } -} - -pub async fn collect_all( - stream: S, - bin_count_exp: u32, - timeout: Duration, - abort_after_bin_count: u32, -) -> Result -where - S: Stream> + Unpin, - T: Collectable, -{ - info!("\n\nConstruct deadline with timeout {timeout:?}\n\n"); - let deadline = tokio::time::Instant::now() + timeout; - let mut collector = ::new_collector(bin_count_exp); - let mut i1 = 0; - let mut stream = stream; - loop { - let item = if i1 == 0 { - stream.next().await - } else { - if abort_after_bin_count > 0 && collector.len() >= abort_after_bin_count as usize { - None - } else { - match tokio::time::timeout_at(deadline, stream.next()).await { - Ok(k) => k, - Err(_) => { - collector.set_timed_out(); - None - } - } - } - }; - match item { - Some(item) => { - match item { - Ok(item) => match item { - StreamItem::Log(_) => {} - StreamItem::Stats(_) => {} - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => { - collector.set_range_complete(); - } - RangeCompletableItem::Data(item) => { - collector.ingest(&item); - i1 += 1; - } - }, - }, - Err(e) => { - // TODO Need to use some flags to get good enough error message for remote user. - Err(e)?; - } - }; - } - None => break, - } - } - let ret = serde_json::to_value(collector.result()?)?; - Ok(ret) -} - -pub struct BinnedJsonChannelExec { - query: BinnedQuery, - node_config: NodeConfigCached, - timeout: Duration, -} - -impl BinnedJsonChannelExec { - pub fn new(query: BinnedQuery, timeout: Duration, node_config: NodeConfigCached) -> Self { - Self { - query, - node_config, - timeout, - } - } -} - -impl ChannelExecFunction for BinnedJsonChannelExec { - type Output = Pin> + Send>>; - - fn exec( - self, - _byte_order: END, - scalar_type: ScalarType, - shape: Shape, - event_value_shape: EVS, - _events_node_proc: ENP, - ) -> Result - where - NTY: NumOps + NumFromBytes + 'static, - END: Endianness + 'static, - EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Batch> + 'static, - // TODO require these things in general? - ::Output: Collectable + PushableIndex + Clearable, - <::Output as TimeBinnableType>::Output: Debug - + TimeBinnableType::Output as TimeBinnableType>::Output> - + Collectable - + Unpin, - Sitemty<::Output>: FrameType + Framable + 'static, - Sitemty<<::Output as TimeBinnableType>::Output>: - FrameType + Framable + FrameDecodable, - { - let _ = event_value_shape; - let range = BinnedRange::covering_range(self.query.range().clone(), self.query.bin_count())?; - let t_bin_count = range.bin_count() as u32; - let perf_opts = PerfOpts { inmem_bufcap: 512 }; - let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) { - Ok(Some(pre_range)) => { - info!("BinnedJsonChannelExec found pre_range: {pre_range:?}"); - if range.grid_spec().bin_t_len() < pre_range.grid_spec.bin_t_len() { - let msg = format!( - "BinnedJsonChannelExec incompatible ranges:\npre_range: {pre_range:?}\nrange: {range:?}" - ); - return Err(Error::with_msg(msg)); - } - let s = BinnedFromPreBinned::<<::Output as TimeBinnableType>::Output>::new( - PreBinnedPatchIterator::from_range(pre_range), - self.query.channel().clone(), - range.clone(), - scalar_type, - shape, - self.query.agg_kind().clone(), - self.query.cache_usage().clone(), - self.query.disk_io_buffer_size(), - &self.node_config, - self.query.disk_stats_every().clone(), - self.query.report_error(), - )?; - let f = collect_plain_events_json(s, self.timeout, t_bin_count, u64::MAX, self.query.do_log()); - let s = futures_util::stream::once(f).map(|item| match item { - Ok(item) => Ok(Bytes::from(serde_json::to_vec(&item)?)), - Err(e) => Err(e.into()), - }); - Ok(Box::pin(s) as Pin> + Send>>) - } - Ok(None) => { - info!("BinnedJsonChannelExec no covering range for prebinned, merge from remotes instead {range:?}"); - // TODO let BinnedQuery provide the DiskIoTune and pass to RawEventsQuery: - let evq = RawEventsQuery::new( - self.query.channel().clone(), - self.query.range().clone(), - self.query.agg_kind().clone(), - ); - let x_bin_count = x_bin_count(&shape, self.query.agg_kind()); - let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); - let s = TBinnerStream::<_, ::Output>::new( - s, - range, - x_bin_count, - self.query.agg_kind().do_time_weighted(), - ); - let f = collect_plain_events_json(s, self.timeout, t_bin_count, u64::MAX, self.query.do_log()); - let s = futures_util::stream::once(f).map(|item| match item { - Ok(item) => Ok(Bytes::from(serde_json::to_vec(&item)?)), - Err(e) => Err(e.into()), - }); - Ok(Box::pin(s) as Pin> + Send>>) - } - Err(e) => Err(e), - }?; - Ok(souter) - } - - fn empty() -> Self::Output { - info!("BinnedJsonChannelExec fn empty"); - Box::pin(futures_util::stream::empty()) - } -} - -pub async fn binned_json( - query: &BinnedQuery, - scalar_type: ScalarType, - shape: Shape, - node_config: &NodeConfigCached, -) -> Result> + Send>>, Error> { - let ret = channel_exec( - BinnedJsonChannelExec::new(query.clone(), query.timeout(), node_config.clone()), - query.channel(), - query.range(), - scalar_type, - shape, - query.agg_kind().clone(), - node_config, - ) - .await?; - Ok(Box::pin(ret)) -} - -pub trait EventsDecoder { - type Output; - fn ingest(&mut self, event: &[u8]); - fn result(&mut self) -> Self::Output; -} diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs deleted file mode 100644 index 1fb57d6..0000000 --- a/disk/src/channelexec.rs +++ /dev/null @@ -1,363 +0,0 @@ -use crate::agg::enp::Identity; -use crate::decode::{BigEndian, Endianness, EventValueFromBytes, EventValueShape, LittleEndian, NumFromBytes}; -use crate::decode::{EventValuesDim0Case, EventValuesDim1Case}; -use crate::merge::mergedfromremotes::MergedFromRemotes; -use bytes::Bytes; -use err::Error; -use futures_core::Stream; -use futures_util::future::FutureExt; -use futures_util::StreamExt; -use items::numops::{BoolNum, NumOps, StringNum}; -use items::scalarevents::ScalarEvents; -use items::streams::{collect_plain_events_json, Collectable}; -use items::{Clearable, EventsNodeProcessor, Framable, FrameType, FrameTypeStatic}; -use items::{PushableIndex, Sitemty, TimeBinnableType}; -use netpod::query::{PlainEventsQuery, RawEventsQuery}; -use netpod::{AggKind, ByteOrder, Channel, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape}; -use serde::de::DeserializeOwned; -use std::fmt::Debug; -use std::pin::Pin; -use std::time::Duration; - -pub trait ChannelExecFunction { - type Output; - - fn exec( - self, - byte_order: END, - scalar_type: ScalarType, - shape: Shape, - event_value_shape: EVS, - events_node_proc: ENP, - ) -> Result - where - NTY: NumOps + NumFromBytes + 'static, - END: Endianness + 'static, - EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Batch> + 'static, - // TODO require these things in general? - ::Output: Debug + Collectable + PushableIndex + Clearable, - <::Output as TimeBinnableType>::Output: Debug - + TimeBinnableType::Output as TimeBinnableType>::Output> - + Collectable - + Unpin, - Sitemty<::Output>: FrameType + Framable + 'static, - Sitemty<<::Output as TimeBinnableType>::Output>: - FrameType + Framable + DeserializeOwned; - - fn empty() -> Self::Output; -} - -fn channel_exec_nty_end_evs_enp( - f: F, - byte_order: END, - scalar_type: ScalarType, - shape: Shape, - event_value_shape: EVS, - events_node_proc: ENP, -) -> Result -where - F: ChannelExecFunction, - NTY: NumOps + NumFromBytes + 'static, - END: Endianness + 'static, - EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Batch> + 'static, - // TODO require these things in general? - ::Output: Debug + Collectable + PushableIndex + Clearable, - <::Output as TimeBinnableType>::Output: Debug - + TimeBinnableType::Output as TimeBinnableType>::Output> - + Collectable - + Unpin, - // TODO shouldn't one of FrameType or FrameTypeStatic be enough? - Sitemty<::Output>: FrameType + FrameTypeStatic + Framable + 'static, - Sitemty<<::Output as TimeBinnableType>::Output>: - FrameType + FrameTypeStatic + Framable + DeserializeOwned, -{ - Ok(f.exec(byte_order, scalar_type, shape, event_value_shape, events_node_proc)?) -} - -fn channel_exec_nty_end( - f: F, - byte_order: END, - scalar_type: ScalarType, - shape: Shape, - agg_kind: AggKind, -) -> Result -where - F: ChannelExecFunction, - NTY: NumOps + NumFromBytes + 'static, - END: Endianness + 'static, - ScalarEvents: Collectable, -{ - match shape { - Shape::Scalar => { - let evs = EventValuesDim0Case::new(); - match agg_kind { - AggKind::EventBlobs => panic!(), - AggKind::Plain => { - let events_node_proc = < as EventValueShape>::NumXAggPlain as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); - channel_exec_nty_end_evs_enp(f, byte_order, scalar_type, shape, evs, events_node_proc) - } - AggKind::TimeWeightedScalar => { - let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); - channel_exec_nty_end_evs_enp(f, byte_order, scalar_type, shape, evs, events_node_proc) - } - AggKind::DimXBins1 => { - let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); - channel_exec_nty_end_evs_enp(f, byte_order, scalar_type, shape, evs, events_node_proc) - } - AggKind::DimXBinsN(_) => { - let events_node_proc = < as EventValueShape>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); - channel_exec_nty_end_evs_enp(f, byte_order, scalar_type, shape, evs, events_node_proc) - } - AggKind::Stats1 => { - let events_node_proc = < as EventValueShape>::NumXAggToStats1 as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); - channel_exec_nty_end_evs_enp(f, byte_order, scalar_type, shape, evs, events_node_proc) - } - } - } - Shape::Wave(n) => { - let evs = EventValuesDim1Case::new(n); - match agg_kind { - AggKind::EventBlobs => panic!(), - AggKind::Plain => { - let events_node_proc = < as EventValueShape>::NumXAggPlain as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); - channel_exec_nty_end_evs_enp(f, byte_order, scalar_type, shape, evs, events_node_proc) - } - AggKind::TimeWeightedScalar => { - let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); - channel_exec_nty_end_evs_enp(f, byte_order, scalar_type, shape, evs, events_node_proc) - } - AggKind::DimXBins1 => { - let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); - channel_exec_nty_end_evs_enp(f, byte_order, scalar_type, shape, evs, events_node_proc) - } - AggKind::DimXBinsN(_) => { - let events_node_proc = < as EventValueShape>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); - channel_exec_nty_end_evs_enp(f, byte_order, scalar_type, shape, evs, events_node_proc) - } - AggKind::Stats1 => { - let events_node_proc = < as EventValueShape>::NumXAggToStats1 as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); - channel_exec_nty_end_evs_enp(f, byte_order, scalar_type, shape, evs, events_node_proc) - } - } - } - Shape::Image(..) => { - // TODO needed for binning or json event retrieval - err::todoval() - } - } -} - -macro_rules! match_end { - ($f:expr, $nty:ident, $end:expr, $scalar_type:expr, $shape:expr, $agg_kind:expr, $node_config:expr) => { - match $end { - ByteOrder::Little => { - channel_exec_nty_end::<_, $nty, _>($f, LittleEndian {}, $scalar_type, $shape, $agg_kind) - } - ByteOrder::Big => channel_exec_nty_end::<_, $nty, _>($f, BigEndian {}, $scalar_type, $shape, $agg_kind), - } - }; -} - -fn channel_exec_config( - f: F, - scalar_type: ScalarType, - byte_order: ByteOrder, - shape: Shape, - agg_kind: AggKind, - _node_config: &NodeConfigCached, -) -> Result -where - F: ChannelExecFunction, -{ - match scalar_type { - ScalarType::U8 => match_end!(f, u8, byte_order, scalar_type, shape, agg_kind, node_config), - ScalarType::U16 => match_end!(f, u16, byte_order, scalar_type, shape, agg_kind, node_config), - ScalarType::U32 => match_end!(f, u32, byte_order, scalar_type, shape, agg_kind, node_config), - ScalarType::U64 => match_end!(f, u64, byte_order, scalar_type, shape, agg_kind, node_config), - ScalarType::I8 => match_end!(f, i8, byte_order, scalar_type, shape, agg_kind, node_config), - ScalarType::I16 => match_end!(f, i16, byte_order, scalar_type, shape, agg_kind, node_config), - ScalarType::I32 => match_end!(f, i32, byte_order, scalar_type, shape, agg_kind, node_config), - ScalarType::I64 => match_end!(f, i64, byte_order, scalar_type, shape, agg_kind, node_config), - ScalarType::F32 => match_end!(f, f32, byte_order, scalar_type, shape, agg_kind, node_config), - ScalarType::F64 => match_end!(f, f64, byte_order, scalar_type, shape, agg_kind, node_config), - ScalarType::BOOL => match_end!(f, BoolNum, byte_order, scalar_type, shape, agg_kind, node_config), - ScalarType::STRING => match_end!(f, StringNum, byte_order, scalar_type, shape, agg_kind, node_config), - } -} - -pub async fn channel_exec( - f: F, - _channel: &Channel, - _range: &NanoRange, - scalar_type: ScalarType, - shape: Shape, - agg_kind: AggKind, - node_config: &NodeConfigCached, -) -> Result -where - F: ChannelExecFunction, -{ - let ret = channel_exec_config( - f, - scalar_type, - // TODO TODO TODO is the byte order ever important here? - ByteOrder::Little, - shape, - agg_kind, - node_config, - )?; - Ok(ret) -} - -pub struct PlainEvents { - channel: Channel, - range: NanoRange, - agg_kind: AggKind, - node_config: NodeConfigCached, -} - -impl PlainEvents { - pub fn new(channel: Channel, range: NanoRange, node_config: NodeConfigCached) -> Self { - Self { - channel, - range, - agg_kind: AggKind::Plain, - node_config, - } - } - - pub fn channel(&self) -> &Channel { - &self.channel - } - - pub fn range(&self) -> &NanoRange { - &self.range - } -} - -impl ChannelExecFunction for PlainEvents { - type Output = Pin> + Send>>; - - fn exec( - self, - byte_order: END, - _scalar_type: ScalarType, - _shape: Shape, - event_value_shape: EVS, - _events_node_proc: ENP, - ) -> Result - where - NTY: NumOps + NumFromBytes + 'static, - END: Endianness + 'static, - EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Batch> + 'static, - { - let _ = byte_order; - let _ = event_value_shape; - let perf_opts = PerfOpts { inmem_bufcap: 4096 }; - // TODO let upstream provide DiskIoTune and pass in RawEventsQuery: - let evq = RawEventsQuery::new(self.channel, self.range, self.agg_kind); - let s = MergedFromRemotes::>::new(evq, perf_opts, self.node_config.node_config.cluster); - let s = s.map(|item| Box::new(item) as Box); - Ok(Box::pin(s)) - } - - fn empty() -> Self::Output { - Box::pin(futures_util::stream::empty()) - } -} - -pub struct PlainEventsJson { - query: PlainEventsQuery, - channel: Channel, - range: NanoRange, - agg_kind: AggKind, - timeout: Duration, - node_config: NodeConfigCached, - events_max: u64, - do_log: bool, -} - -impl PlainEventsJson { - pub fn new( - query: PlainEventsQuery, - channel: Channel, - range: NanoRange, - timeout: Duration, - node_config: NodeConfigCached, - events_max: u64, - do_log: bool, - ) -> Self { - Self { - query, - channel, - range, - agg_kind: AggKind::Plain, - timeout, - node_config, - events_max, - do_log, - } - } - - pub fn channel(&self) -> &Channel { - &self.channel - } - - pub fn range(&self) -> &NanoRange { - &self.range - } -} - -impl ChannelExecFunction for PlainEventsJson { - type Output = Pin> + Send>>; - - fn exec( - self, - _byte_order: END, - _scalar_type: ScalarType, - _shape: Shape, - _event_value_shape: EVS, - _events_node_proc: ENP, - ) -> Result - where - NTY: NumOps + NumFromBytes + 'static, - END: Endianness + 'static, - EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Batch> + 'static, - // TODO require these things in general? - ::Output: Debug + Collectable + PushableIndex + Clearable, - <::Output as TimeBinnableType>::Output: Debug - + TimeBinnableType::Output as TimeBinnableType>::Output> - + Collectable - + Unpin, - Sitemty<::Output>: FrameType + Framable + 'static, - Sitemty<<::Output as TimeBinnableType>::Output>: - FrameType + Framable + DeserializeOwned, - { - let perf_opts = PerfOpts { inmem_bufcap: 4096 }; - // TODO let upstream provide DiskIoTune and set in RawEventsQuery. - let mut evq = RawEventsQuery::new(self.channel, self.range, self.agg_kind); - evq.do_test_main_error = self.query.do_test_main_error(); - evq.do_test_stream_error = self.query.do_test_stream_error(); - let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster); - let f = collect_plain_events_json(s, self.timeout, 0, self.events_max, self.do_log); - let f = FutureExt::map(f, |item| match item { - Ok(item) => { - // TODO add channel entry info here? - //let obj = item.as_object_mut().unwrap(); - //obj.insert("channelName", JsonValue::String(en)); - Ok(Bytes::from(serde_json::to_vec(&item)?)) - } - Err(e) => Err(e.into()), - }); - let s = futures_util::stream::once(f); - Ok(Box::pin(s)) - } - - fn empty() -> Self::Output { - Box::pin(futures_util::stream::empty()) - } -} diff --git a/disk/src/disk.rs b/disk/src/disk.rs index a0b0759..3c02c17 100644 --- a/disk/src/disk.rs +++ b/disk/src/disk.rs @@ -1,10 +1,8 @@ pub mod agg; #[cfg(test)] pub mod aggtest; -pub mod binned; pub mod binnedstream; pub mod cache; -pub mod channelexec; pub mod dataopen; pub mod decode; pub mod eventblobs; diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index 1e6a3db..b1ddd26 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -12,11 +12,10 @@ serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } http = "0.2" url = "2.2" -tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.22.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] } hyper-tls = { version="0.5.0" } bytes = "1.0.1" -futures-core = "0.3.14" futures-util = "0.3.14" tracing = "0.1" tracing-futures = "0.2" diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 8700d42..953369c 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -2,8 +2,7 @@ use crate::err::Error; use crate::gather::{gather_get_json_generic, SubRes}; use crate::{response, BodyStream, ReqCtx}; use bytes::{BufMut, BytesMut}; -use futures_core::Stream; -use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; +use futures_util::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use http::{Method, StatusCode}; use hyper::{Body, Client, Request, Response}; use items::eventfull::EventFull; diff --git a/httpret/src/bodystream.rs b/httpret/src/bodystream.rs index f2ed002..92d5e40 100644 --- a/httpret/src/bodystream.rs +++ b/httpret/src/bodystream.rs @@ -1,7 +1,6 @@ use crate::err::Error; use bytes::Bytes; -use futures_core::Stream; -use futures_util::StreamExt; +use futures_util::{Stream, StreamExt}; use http::HeaderMap; use http::{Response, StatusCode}; use hyper::Body; diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index 5c4e4ce..3f58c9d 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -1,11 +1,11 @@ use crate::err::Error; use crate::{response, ToPublicResponse}; use dbconn::{create_connection, create_scylla_connection}; -use disk::binned::query::PreBinnedQuery; use futures_util::StreamExt; use http::{Method, Request, Response, StatusCode}; use hyper::Body; use netpod::log::*; +use netpod::query::prebinned::PreBinnedQuery; use netpod::query::{BinnedQuery, PlainEventsQuery}; use netpod::timeunits::*; use netpod::{get_url_query_pairs, Channel, ChannelConfigQuery, Database, FromUrl, ScalarType, ScyllaConfig, Shape}; diff --git a/httpret/src/events.rs b/httpret/src/events.rs index 7426513..099eeac 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -1,14 +1,14 @@ use crate::channelconfig::{chconf_from_events_binary, chconf_from_events_json}; use crate::err::Error; use crate::{response, response_err, BodyStream, ReqCtx, ToPublicResponse}; -use futures_util::{Stream, StreamExt, TryStreamExt}; +use futures_util::{stream, Stream, StreamExt, TryStreamExt}; use http::{Method, Request, Response, StatusCode}; use hyper::Body; use items_2::channelevents::ChannelEvents; use items_2::merger_cev::ChannelEventsMerger; use items_2::{binned_collected, empty_events_dyn, empty_events_dyn_2}; +use netpod::log::*; use netpod::query::{BinnedQuery, ChannelStateEventsQuery, PlainEventsQuery, RawEventsQuery}; -use netpod::{log::*, HasBackend}; use netpod::{AggKind, BinnedRange, FromUrl, NodeConfigCached}; use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET}; use scyllaconn::create_scy_session; @@ -72,25 +72,13 @@ async fn plain_events_binary( debug!("httpret plain_events_binary req: {:?}", req); let query = PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?; let chconf = chconf_from_events_binary(&query, node_config).await?; - // Update the series id since we don't require some unique identifier yet. let mut query = query; query.set_series_id(chconf.series); let query = query; // --- - - let op = disk::channelexec::PlainEvents::new(query.channel().clone(), query.range().clone(), node_config.clone()); - let s = disk::channelexec::channel_exec( - op, - query.channel(), - query.range(), - chconf.scalar_type, - chconf.shape, - AggKind::Plain, - node_config, - ) - .await?; - let s = s.map(|item| item.make_frame()); + let _ = query; + let s = stream::iter([Ok::<_, Error>(String::from("TODO_PREBINNED_BINARY_STREAM"))]); let ret = response(StatusCode::OK).body(BodyStream::wrapped( s.map_err(Error::from), format!("plain_events_binary"), @@ -110,46 +98,16 @@ async fn plain_events_json( error!("chconf_from_events_json {e:?}"); e.add_public_msg(format!("Can not get channel information")) })?; - // Update the series id since we don't require some unique identifier yet. let mut query = query; query.set_series_id(chconf.series); let query = query; // --- - - if true || query.backend().starts_with("test-") { - let query = RawEventsQuery::new(query.channel().clone(), query.range().clone(), AggKind::Plain); - let item = streams::plaineventsjson::plain_events_json(query, &node_config.node_config.cluster).await?; - let buf = serde_json::to_vec(&item)?; - let ret = response(StatusCode::OK).body(Body::from(buf))?; - Ok(ret) - } else { - let op = disk::channelexec::PlainEventsJson::new( - // TODO pass only the query, not channel, range again: - query.clone(), - query.channel().clone(), - query.range().clone(), - query.timeout(), - node_config.clone(), - query.events_max().unwrap_or(u64::MAX), - query.do_log(), - ); - let s = disk::channelexec::channel_exec( - op, - query.channel(), - query.range(), - chconf.scalar_type, - chconf.shape, - AggKind::Plain, - node_config, - ) - .await?; - let ret = response(StatusCode::OK).body(BodyStream::wrapped( - s.map_err(Error::from), - format!("plain_events_json"), - ))?; - Ok(ret) - } + let query = RawEventsQuery::new(query.channel().clone(), query.range().clone(), AggKind::Plain); + let item = streams::plaineventsjson::plain_events_json(query, &node_config.node_config.cluster).await?; + let buf = serde_json::to_vec(&item)?; + let ret = response(StatusCode::OK).body(Body::from(buf))?; + Ok(ret) } pub struct EventsHandlerScylla {} diff --git a/httpret/src/evinfo.rs b/httpret/src/evinfo.rs index 39eda55..8b13789 100644 --- a/httpret/src/evinfo.rs +++ b/httpret/src/evinfo.rs @@ -1,170 +1 @@ -use crate::channelconfig::chconf_from_events_json; -use crate::err::Error; -use crate::response; -use bytes::Bytes; -use disk::channelexec::channel_exec; -use disk::channelexec::ChannelExecFunction; -use disk::decode::Endianness; -use disk::decode::EventValueFromBytes; -use disk::decode::EventValueShape; -use disk::decode::NumFromBytes; -use disk::merge::mergedfromremotes::MergedFromRemotes; -use futures_util::FutureExt; -use futures_util::Stream; -use futures_util::TryStreamExt; -use http::{Method, StatusCode}; -use hyper::{Body, Request, Response}; -use items::numops::NumOps; -use items::streams::collect_plain_events_json; -use items::streams::Collectable; -use items::Clearable; -use items::EventsNodeProcessor; -use items::Framable; -use items::FrameType; -use items::PushableIndex; -use items::Sitemty; -use items::TimeBinnableType; -use netpod::log::*; -use netpod::query::{PlainEventsQuery, RawEventsQuery}; -use netpod::{AggKind, Channel, FromUrl, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape}; -use serde::de::DeserializeOwned; -use std::fmt::Debug; -use std::pin::Pin; -use std::time::Duration; -use url::Url; -pub struct EventInfoScan {} - -impl EventInfoScan { - pub fn handler(req: &Request) -> Option { - if req.uri().path().starts_with("/api/4/event/info") { - Some(Self {}) - } else { - None - } - } - - pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { - info!("EventInfoScan::handle"); - if req.method() != Method::GET { - return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); - } - let (head, _body) = req.into_parts(); - let url = Url::parse(&format!("dummy:{}", head.uri))?; - let query = PlainEventsQuery::from_url(&url)?; - let ret = match Self::exec(&query, node_config).await { - Ok(stream) => { - // - let stream = stream.map_ok(|_| Bytes::new()); - response(StatusCode::OK).body(Body::wrap_stream(stream))? - } - Err(e) => response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))?, - }; - Ok(ret) - } - - pub async fn exec( - query: &PlainEventsQuery, - node_config: &NodeConfigCached, - ) -> Result> + Send>>, Error> { - let chconf = chconf_from_events_json(&query, node_config).await?; - let ret = channel_exec( - EvInfoFunc::new( - query.clone(), - query.timeout(), - query.events_max().unwrap_or(u64::MAX), - node_config.clone(), - ), - query.channel(), - query.range(), - chconf.scalar_type, - chconf.shape, - AggKind::Stats1, - node_config, - ) - .await?; - Ok(Box::pin(ret.map_err(Error::from))) - } -} - -pub struct EvInfoFunc { - query: PlainEventsQuery, - timeout: Duration, - node_config: NodeConfigCached, - events_max: u64, -} - -impl EvInfoFunc { - pub fn new(query: PlainEventsQuery, timeout: Duration, events_max: u64, node_config: NodeConfigCached) -> Self { - Self { - query, - timeout, - events_max, - node_config, - } - } - - pub fn channel(&self) -> &Channel { - &self.query.channel() - } - - pub fn range(&self) -> &NanoRange { - &self.query.range() - } -} - -impl ChannelExecFunction for EvInfoFunc { - type Output = Pin> + Send>>; - - fn exec( - self, - byte_order: END, - _scalar_type: ScalarType, - _shape: Shape, - event_value_shape: EVS, - _events_node_proc: ENP, - ) -> Result - where - NTY: NumOps + NumFromBytes + 'static, - END: Endianness + 'static, - EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Batch> + 'static, - // TODO require these things in general? - ::Output: Debug + Collectable + PushableIndex + Clearable, - <::Output as TimeBinnableType>::Output: Debug - + TimeBinnableType::Output as TimeBinnableType>::Output> - + Collectable - + Unpin, - Sitemty<::Output>: FrameType + Framable + 'static, - Sitemty<<::Output as TimeBinnableType>::Output>: - FrameType + Framable + DeserializeOwned, - { - let _ = byte_order; - let _ = event_value_shape; - let perf_opts = PerfOpts { inmem_bufcap: 4096 }; - // TODO let PlainEventsJsonQuery provide the tune and pass to RawEventsQuery: - let evq = RawEventsQuery::new(self.query.channel().clone(), self.query.range().clone(), AggKind::Plain); - - // TODO Use a Merged-From-Multiple-Local-Splits. - // TODO Pass the read buffer size from query parameter: GPFS needs a larger buffer.. - // TODO Must issue multiple reads to GPFS, keep futures in a ordered queue. - - let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster); - let f = collect_plain_events_json(s, self.timeout, 0, self.events_max, self.query.do_log()); - let f = FutureExt::map(f, |item| match item { - Ok(item) => { - // TODO add channel entry info here? - //let obj = item.as_object_mut().unwrap(); - //obj.insert("channelName", JsonValue::String(en)); - Ok(Bytes::from(serde_json::to_vec(&item)?)) - } - Err(e) => Err(e.into()), - }); - let s = futures_util::stream::once(f); - Ok(Box::pin(s)) - } - - fn empty() -> Self::Output { - Box::pin(futures_util::stream::empty()) - } -} diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 5af163a..1629340 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -6,7 +6,6 @@ pub mod channelconfig; pub mod download; pub mod err; pub mod events; -pub mod evinfo; pub mod gather; pub mod prometheus; pub mod proxy; @@ -19,33 +18,32 @@ use crate::bodystream::response; use crate::err::Error; use crate::gather::gather_get_json; use crate::pulsemap::UpdateTask; -use channelconfig::{chconf_from_binned, ChConf}; -use disk::binned::query::PreBinnedQuery; -use future::Future; -use futures_util::{FutureExt, StreamExt, TryStreamExt}; +use futures_util::{Future, FutureExt, StreamExt}; use http::{Method, StatusCode}; use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; use hyper::{server::Server, Body, Request, Response}; use net::SocketAddr; use netpod::log::*; -use netpod::query::BinnedQuery; +use netpod::query::prebinned::PreBinnedQuery; use netpod::timeunits::SEC; use netpod::ProxyConfig; -use netpod::{FromUrl, NodeConfigCached, NodeStatus, NodeStatusArchiverAppliance}; -use netpod::{ACCEPT_ALL, APP_JSON, APP_JSON_LINES, APP_OCTET}; +use netpod::{NodeConfigCached, NodeStatus, NodeStatusArchiverAppliance}; +use netpod::{APP_JSON, APP_JSON_LINES}; use nodenet::conn::events_service; use panic::{AssertUnwindSafe, UnwindSafe}; use pin::Pin; use serde::Serialize; use std::collections::BTreeMap; +use std::net; +use std::panic; +use std::pin; use std::sync::atomic::{AtomicPtr, Ordering}; use std::sync::{Once, RwLock, RwLockWriteGuard}; +use std::task; use std::time::SystemTime; -use std::{future, net, panic, pin, task}; -use task::{Context, Poll}; -use tracing::Instrument; -use url::Url; +use task::Context; +use task::Poll; pub const PSI_DAQBUFFER_SERVICE_MARK: &'static str = "PSI-Daqbuffer-Service-Mark"; pub const PSI_DAQBUFFER_SEEN_URL: &'static str = "PSI-Daqbuffer-Seen-Url"; @@ -324,12 +322,6 @@ async fn http_service_inner( h.handle(req, ctx, &node_config).await } else if let Some(h) = api4::binned::BinnedHandler::handler(&req) { h.handle(req, &node_config).await - } else if path == "/api/4/binned" { - if req.method() == Method::GET { - Ok(binned(req, ctx, node_config).await?) - } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) - } } else if path == "/api/4/prebinned" { if req.method() == Method::GET { Ok(prebinned(req, ctx, &node_config).await?) @@ -384,8 +376,6 @@ async fn http_service_inner( h.handle(req, &node_config).await } else if let Some(h) = api1::Api1EventsBinaryHandler::handler(&req) { h.handle(req, ctx, &node_config).await - } else if let Some(h) = evinfo::EventInfoScan::handler(&req) { - h.handle(req, &node_config).await } else if let Some(h) = pulsemap::MapPulseScyllaHandler::handler(&req) { h.handle(req, &node_config).await } else if let Some(h) = pulsemap::IndexFullHttpFunction::handler(&req) { @@ -468,74 +458,6 @@ impl StatusBoardAllHandler { } } -async fn binned(req: Request, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result, Error> { - match binned_inner(req, ctx, node_config).await { - Ok(ret) => Ok(ret), - Err(e) => { - error!("fn binned: {e:?}"); - Ok(e.to_public_response()) - } - } -} - -async fn binned_inner( - req: Request, - ctx: &ReqCtx, - node_config: &NodeConfigCached, -) -> Result, Error> { - let (head, _body) = req.into_parts(); - let url = Url::parse(&format!("dummy:{}", head.uri))?; - let query = BinnedQuery::from_url(&url).map_err(|e| { - let msg = format!("can not parse query: {}", e.msg()); - e.add_public_msg(msg) - })?; - let chconf = chconf_from_binned(&query, node_config).await?; - // Update the series id since we don't require some unique identifier yet. - let mut query = query; - query.set_series_id(chconf.series); - let query = query; - // --- - let desc = format!("binned-BEG-{}-END-{}", query.range().beg / SEC, query.range().end / SEC); - let span1 = span!(Level::INFO, "httpret::binned", desc = &desc.as_str()); - span1.in_scope(|| { - debug!("binned STARTING {:?}", query); - }); - match head.headers.get(http::header::ACCEPT) { - Some(v) if v == APP_OCTET => binned_binary(query, chconf, &ctx, node_config).await, - Some(v) if v == APP_JSON || v == ACCEPT_ALL => binned_json(query, chconf, &ctx, node_config).await, - _ => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?), - } -} - -async fn binned_binary( - query: BinnedQuery, - chconf: ChConf, - _ctx: &ReqCtx, - node_config: &NodeConfigCached, -) -> Result, Error> { - let body_stream = - disk::binned::binned_bytes_for_http(&query, chconf.scalar_type, chconf.shape, node_config).await?; - let res = response(StatusCode::OK).body(BodyStream::wrapped( - body_stream.map_err(Error::from), - format!("binned_binary"), - ))?; - Ok(res) -} - -async fn binned_json( - query: BinnedQuery, - chconf: ChConf, - _ctx: &ReqCtx, - node_config: &NodeConfigCached, -) -> Result, Error> { - let body_stream = disk::binned::binned_json(&query, chconf.scalar_type, chconf.shape, node_config).await?; - let res = response(StatusCode::OK).body(BodyStream::wrapped( - body_stream.map_err(Error::from), - format!("binned_json"), - ))?; - Ok(res) -} - async fn prebinned(req: Request, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result, Error> { match prebinned_inner(req, ctx, node_config).await { Ok(ret) => Ok(ret), @@ -549,10 +471,11 @@ async fn prebinned(req: Request, ctx: &ReqCtx, node_config: &NodeConfigCac async fn prebinned_inner( req: Request, _ctx: &ReqCtx, - node_config: &NodeConfigCached, + _node_config: &NodeConfigCached, ) -> Result, Error> { let (head, _body) = req.into_parts(); - let query = PreBinnedQuery::from_request(&head)?; + let url: url::Url = format!("dummy://{}", head.uri).parse()?; + let query = PreBinnedQuery::from_url(&url)?; let desc = format!( "pre-W-{}-B-{}", query.patch().bin_t_len() / SEC, @@ -560,21 +483,10 @@ async fn prebinned_inner( ); let span1 = span!(Level::INFO, "httpret::prebinned", desc = &desc.as_str()); span1.in_scope(|| { - debug!("prebinned STARTING"); + debug!("begin"); }); - let fut = disk::binned::prebinned::pre_binned_bytes_for_http(node_config, &query).instrument(span1); - let ret = match fut.await { - Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s.map_err(Error::from), desc))?, - Err(e) => { - if query.report_error() { - response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))? - } else { - error!("fn prebinned: {:?}", e); - response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? - } - } - }; - Ok(ret) + //let fut = disk::binned::prebinned::pre_binned_bytes_for_http(node_config, &query).instrument(span1); + todo!() } async fn node_status( diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index d98f428..4fb8d9d 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -5,8 +5,7 @@ use crate::err::Error; use crate::gather::{gather_get_json_generic, SubRes}; use crate::pulsemap::MapPulseQuery; use crate::{api_1_docs, api_4_docs, response, response_err, Cont, ReqCtx, PSI_DAQBUFFER_SERVICE_MARK}; -use futures_core::Stream; -use futures_util::pin_mut; +use futures_util::{pin_mut, Stream}; use http::{Method, StatusCode}; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, Server}; diff --git a/httpret/src/proxy/api4.rs b/httpret/src/proxy/api4.rs index 762b377..5b4dcbf 100644 --- a/httpret/src/proxy/api4.rs +++ b/httpret/src/proxy/api4.rs @@ -1,7 +1,7 @@ use crate::err::Error; use crate::gather::{gather_get_json_generic, SubRes}; use crate::response; -use futures_core::Future; +use futures_util::Future; use http::{header, Request, Response, StatusCode}; use hyper::Body; use itertools::Itertools; diff --git a/netpod/src/query.rs b/netpod/src/query.rs index 0e74578..5f7338e 100644 --- a/netpod/src/query.rs +++ b/netpod/src/query.rs @@ -1,8 +1,10 @@ pub mod api1; pub mod datetime; +pub mod prebinned; use crate::get_url_query_pairs; -use crate::{log::*, DiskIoTune}; +use crate::log::*; +use crate::DiskIoTune; use crate::{AggKind, AppendToUrl, ByteSize, Channel, FromUrl, HasBackend, HasTimeout, NanoRange, ToNanos}; use chrono::{DateTime, TimeZone, Utc}; use err::Error; diff --git a/disk/src/binned/query.rs b/netpod/src/query/prebinned.rs similarity index 91% rename from disk/src/binned/query.rs rename to netpod/src/query/prebinned.rs index ca9fd6f..53d050e 100644 --- a/disk/src/binned/query.rs +++ b/netpod/src/query/prebinned.rs @@ -1,8 +1,9 @@ +use super::agg_kind_from_binning_scheme; +use super::binning_scheme_append_to_url; +use super::CacheUsage; +use crate::timeunits::SEC; +use crate::{AggKind, AppendToUrl, ByteSize, Channel, FromUrl, PreBinnedPatchCoord, ScalarType, Shape}; use err::Error; -use http::request::Parts; -use netpod::query::{agg_kind_from_binning_scheme, binning_scheme_append_to_url, CacheUsage}; -use netpod::timeunits::SEC; -use netpod::{AggKind, AppendToUrl, ByteSize, Channel, FromUrl, PreBinnedPatchCoord, ScalarType, Shape}; use std::collections::BTreeMap; use url::Url; @@ -98,12 +99,6 @@ impl PreBinnedQuery { Ok(ret) } - pub fn from_request(head: &Parts) -> Result { - let s1 = format!("dummy:{}", head.uri); - let url = Url::parse(&s1)?; - Self::from_url(&url) - } - pub fn patch(&self) -> &PreBinnedPatchCoord { &self.patch }