diff --git a/daqbuffer/src/test/events.rs b/daqbuffer/src/test/events.rs index 8309417..abcd1f2 100644 --- a/daqbuffer/src/test/events.rs +++ b/daqbuffer/src/test/events.rs @@ -3,7 +3,7 @@ use chrono::{DateTime, Utc}; use disk::agg::streams::{StatsItem, StreamItem}; use disk::binned::{NumOps, RangeCompletableItem, WithLen}; use disk::decode::EventValues; -use disk::events::{PlainEventsJsonQuery, PlainEventsQuery}; +use disk::events::{PlainEventsBinaryQuery, PlainEventsJsonQuery}; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::frame::makeframe::FrameType; use disk::streamlog::Streamlog; @@ -63,7 +63,7 @@ where name: channel_name.into(), }; let range = NanoRange::from_date_time(beg_date, end_date); - let query = PlainEventsQuery::new(channel, range); + let query = PlainEventsBinaryQuery::new(channel, range); let hp = HostPort::from_node(node0); let url = query.url(&hp); info!("get_plain_events get {}", url); diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 014c8df..3823aec 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -14,7 +14,7 @@ use crate::decode::{ }; use crate::frame::makeframe::{Framable, FrameType, SubFrId}; use crate::merge::mergedfromremotes::MergedFromRemotes; -use crate::raw::EventsQuery; +use crate::raw::RawEventsQuery; use crate::Sitemty; use bytes::Bytes; use chrono::{TimeZone, Utc}; @@ -121,7 +121,7 @@ where range ); let bin_count = range.count as u32; - let evq = EventsQuery { + let evq = RawEventsQuery { channel: query.channel().clone(), range: query.range().clone(), agg_kind: query.agg_kind().clone(), @@ -786,7 +786,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec { "binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}", range ); - let evq = EventsQuery { + let evq = RawEventsQuery { channel: self.query.channel().clone(), range: self.query.range().clone(), agg_kind: self.query.agg_kind().clone(), diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index c67b2a0..5b981c5 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -7,7 +7,7 @@ use crate::cache::{write_pb_cache_min_max_avg_scalar, CacheFileDesc, WrittenPbCa use crate::decode::{Endianness, EventValueFromBytes, EventValueShape, NumFromBytes}; use crate::frame::makeframe::FrameType; use crate::merge::mergedfromremotes::MergedFromRemotes; -use crate::raw::EventsQuery; +use crate::raw::RawEventsQuery; use crate::streamlog::Streamlog; use crate::Sitemty; use err::Error; @@ -110,7 +110,7 @@ where Pin::Output as TimeBinnableType>::Output>> + Send>>, Error, > { - let evq = EventsQuery { + let evq = RawEventsQuery { channel: self.query.channel().clone(), range: self.query.patch().patch_range(), agg_kind: self.query.agg_kind().clone(), diff --git a/disk/src/binned/query.rs b/disk/src/binned/query.rs index b80ddf9..adcfa23 100644 --- a/disk/src/binned/query.rs +++ b/disk/src/binned/query.rs @@ -1,9 +1,11 @@ -use crate::query::channel_from_params; use chrono::{DateTime, TimeZone, Utc}; use err::Error; use http::request::Parts; use netpod::log::*; -use netpod::{AggKind, ByteSize, Channel, HostPort, NanoRange, PreBinnedPatchCoord, ToNanos}; +use netpod::{ + channel_from_pairs, get_url_query_pairs, AggKind, AppendToUrl, ByteSize, Channel, FromUrl, HasBackend, HasTimeout, + HostPort, NanoRange, PreBinnedPatchCoord, ToNanos, +}; use std::collections::BTreeMap; use std::time::Duration; use url::Url; @@ -63,9 +65,9 @@ impl PreBinnedQuery { .map_err(|e| Error::with_msg(format!("can not parse diskStatsEveryKb {:?}", e)))?; let ret = Self { patch: PreBinnedPatchCoord::new(bin_t_len, patch_t_len, patch_ix), - channel: channel_from_params(&pairs)?, + channel: channel_from_pairs(&pairs)?, agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::DimXBins1), - cache_usage: CacheUsage::from_params(&pairs)?, + cache_usage: CacheUsage::from_pairs(&pairs)?, disk_stats_every: ByteSize::kb(disk_stats_every), report_error: pairs .get("reportError") @@ -137,7 +139,7 @@ impl CacheUsage { .into() } - pub fn from_params(params: &BTreeMap) -> Result { + pub fn from_pairs(params: &BTreeMap) -> Result { let ret = params.get("cacheUsage").map_or(Ok::<_, Error>(CacheUsage::Use), |k| { if k == "use" { Ok(CacheUsage::Use) @@ -200,49 +202,6 @@ impl BinnedQuery { } } - pub fn from_request(req: &http::request::Parts) -> Result { - let params = netpod::query_params(req.uri.query()); - let beg_date = params.get("begDate").ok_or(Error::with_msg("missing begDate"))?; - let end_date = params.get("endDate").ok_or(Error::with_msg("missing endDate"))?; - let disk_stats_every = params.get("diskStatsEveryKb").map_or("2000", |k| k); - let disk_stats_every = disk_stats_every - .parse() - .map_err(|e| Error::with_msg(format!("can not parse diskStatsEveryKb {:?}", e)))?; - let ret = Self { - channel: channel_from_params(¶ms)?, - range: NanoRange { - beg: beg_date.parse::>()?.to_nanos(), - end: end_date.parse::>()?.to_nanos(), - }, - bin_count: params - .get("binCount") - .ok_or(Error::with_msg("missing binCount"))? - .parse() - .map_err(|e| Error::with_msg(format!("can not parse binCount {:?}", e)))?, - agg_kind: agg_kind_from_binning_scheme(¶ms).unwrap_or(AggKind::DimXBins1), - cache_usage: CacheUsage::from_params(¶ms)?, - disk_stats_every: ByteSize::kb(disk_stats_every), - report_error: params - .get("reportError") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, - timeout: params - .get("timeout") - .map_or("2000", |k| k) - .parse::() - .map(|k| Duration::from_millis(k)) - .map_err(|e| Error::with_msg(format!("can not parse timeout {:?}", e)))?, - abort_after_bin_count: params - .get("abortAfterBinCount") - .map_or("0", |k| k) - .parse() - .map_err(|e| Error::with_msg(format!("can not parse abortAfterBinCount {:?}", e)))?, - }; - info!("BinnedQuery::from_request {:?}", ret); - Ok(ret) - } - pub fn range(&self) -> &NanoRange { &self.range } @@ -291,8 +250,7 @@ impl BinnedQuery { self.timeout = k; } - // TODO the BinnedQuery itself should maybe already carry the full HostPort? - // On the other hand, want to keep the flexibility for the fail over possibility.. + // TODO remove in favor of AppendToUrl pub fn url(&self, host: &HostPort) -> String { let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; format!( @@ -313,6 +271,86 @@ impl BinnedQuery { } } +impl HasBackend for BinnedQuery { + fn backend(&self) -> &str { + &self.channel.backend + } +} + +impl HasTimeout for BinnedQuery { + fn timeout(&self) -> Duration { + self.timeout.clone() + } +} + +impl FromUrl for BinnedQuery { + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + let beg_date = pairs.get("begDate").ok_or(Error::with_msg("missing begDate"))?; + let end_date = pairs.get("endDate").ok_or(Error::with_msg("missing endDate"))?; + let disk_stats_every = pairs.get("diskStatsEveryKb").map_or("2000", |k| k); + let disk_stats_every = disk_stats_every + .parse() + .map_err(|e| Error::with_msg(format!("can not parse diskStatsEveryKb {:?}", e)))?; + let ret = Self { + channel: channel_from_pairs(&pairs)?, + range: NanoRange { + beg: beg_date.parse::>()?.to_nanos(), + end: end_date.parse::>()?.to_nanos(), + }, + bin_count: pairs + .get("binCount") + .ok_or(Error::with_msg("missing binCount"))? + .parse() + .map_err(|e| Error::with_msg(format!("can not parse binCount {:?}", e)))?, + agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::DimXBins1), + cache_usage: CacheUsage::from_pairs(&pairs)?, + disk_stats_every: ByteSize::kb(disk_stats_every), + report_error: pairs + .get("reportError") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, + timeout: pairs + .get("timeout") + .map_or("2000", |k| k) + .parse::() + .map(|k| Duration::from_millis(k)) + .map_err(|e| Error::with_msg(format!("can not parse timeout {:?}", e)))?, + abort_after_bin_count: pairs + .get("abortAfterBinCount") + .map_or("0", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse abortAfterBinCount {:?}", e)))?, + }; + info!("BinnedQuery::from_url {:?}", ret); + Ok(ret) + } +} + +impl AppendToUrl for BinnedQuery { + fn append_to_url(&self, url: &mut Url) { + let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; + let mut g = url.query_pairs_mut(); + g.append_pair("cacheUsage", &self.cache_usage.to_string()); + g.append_pair("channelBackend", &self.channel.backend); + g.append_pair("channelName", &self.channel.name); + g.append_pair("binCount", &format!("{}", self.bin_count)); + g.append_pair( + "begDate", + &Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(), + ); + g.append_pair( + "endDate", + &Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(), + ); + g.append_pair("binningScheme", &binning_scheme_query_string(&self.agg_kind)); + g.append_pair("diskStatsEveryKb", &format!("{}", self.disk_stats_every.bytes() / 1024)); + g.append_pair("timeout", &format!("{}", self.timeout.as_millis())); + g.append_pair("abortAfterBinCount", &format!("{}", self.abort_after_bin_count)); + } +} + fn binning_scheme_query_string(agg_kind: &AggKind) -> String { match agg_kind { AggKind::Plain => "fullValue".into(), @@ -321,9 +359,9 @@ fn binning_scheme_query_string(agg_kind: &AggKind) -> String { } } -fn agg_kind_from_binning_scheme(params: &BTreeMap) -> Result { +fn agg_kind_from_binning_scheme(pairs: &BTreeMap) -> Result { let key = "binningScheme"; - let s = params + let s = pairs .get(key) .map_or(Err(Error::with_msg(format!("can not find {}", key))), |k| Ok(k))?; let ret = if s == "fullValue" { @@ -331,7 +369,7 @@ fn agg_kind_from_binning_scheme(params: &BTreeMap) -> Result> + Unpin, T: Collectable + Debug, { - info!("\n\nConstruct deadline with timeout {:?}\n\n", timeout); let deadline = tokio::time::Instant::now() + timeout; // TODO in general a Collector does not need to know about the expected number of bins. // It would make more sense for some specific Collector kind to know. @@ -382,7 +380,7 @@ impl ChannelExecFunction for PlainEventsJson { let _ = byte_order; let _ = event_value_shape; let perf_opts = PerfOpts { inmem_bufcap: 4096 }; - let evq = EventsQuery { + let evq = RawEventsQuery { channel: self.channel, range: self.range, agg_kind: self.agg_kind, diff --git a/disk/src/events.rs b/disk/src/events.rs index ba5d01c..3200771 100644 --- a/disk/src/events.rs +++ b/disk/src/events.rs @@ -1,19 +1,23 @@ -use crate::query::channel_from_params; use chrono::{DateTime, TimeZone, Utc}; use err::Error; -use netpod::{Channel, HostPort, NanoRange, ToNanos}; +use netpod::log::*; +use netpod::{ + channel_from_pairs, get_url_query_pairs, AppendToUrl, Channel, FromUrl, HasBackend, HasTimeout, HostPort, + NanoRange, ToNanos, +}; use std::time::Duration; +use url::Url; // TODO move this query type out of this `binned` mod #[derive(Clone, Debug)] -pub struct PlainEventsQuery { +pub struct PlainEventsBinaryQuery { channel: Channel, range: NanoRange, report_error: bool, timeout: Duration, } -impl PlainEventsQuery { +impl PlainEventsBinaryQuery { pub fn new(channel: Channel, range: NanoRange) -> Self { Self { channel, @@ -24,7 +28,10 @@ impl PlainEventsQuery { } pub fn from_request(req: &http::request::Parts) -> Result { + // TODO use Url let params = netpod::query_params(req.uri.query()); + info!("PlainEventsBinaryQuery from_request uri: {:?}", req.uri); + info!("PlainEventsBinaryQuery from_request params: {:?}", params); let beg_date = params.get("begDate").ok_or(Error::with_msg("missing begDate"))?; let end_date = params.get("endDate").ok_or(Error::with_msg("missing endDate"))?; let ret = Self { @@ -32,7 +39,7 @@ impl PlainEventsQuery { beg: beg_date.parse::>()?.to_nanos(), end: end_date.parse::>()?.to_nanos(), }, - channel: channel_from_params(¶ms)?, + channel: channel_from_pairs(¶ms)?, report_error: params .get("reportError") .map_or("false", |k| k) @@ -102,22 +109,22 @@ impl PlainEventsJsonQuery { } } - pub fn from_request(req: &http::request::Parts) -> Result { - let params = netpod::query_params(req.uri.query()); - let beg_date = params.get("begDate").ok_or(Error::with_msg("missing begDate"))?; - let end_date = params.get("endDate").ok_or(Error::with_msg("missing endDate"))?; + pub fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + let beg_date = pairs.get("begDate").ok_or(Error::with_msg("missing begDate"))?; + let end_date = pairs.get("endDate").ok_or(Error::with_msg("missing endDate"))?; let ret = Self { range: NanoRange { beg: beg_date.parse::>()?.to_nanos(), end: end_date.parse::>()?.to_nanos(), }, - channel: channel_from_params(¶ms)?, - report_error: params + channel: channel_from_pairs(&pairs)?, + report_error: pairs .get("reportError") .map_or("false", |k| k) .parse() .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, - timeout: params + timeout: pairs .get("timeout") .map_or("10000", |k| k) .parse::() @@ -127,6 +134,12 @@ impl PlainEventsJsonQuery { Ok(ret) } + pub fn from_request_head(head: &http::request::Parts) -> Result { + let s1 = format!("dummy:{}", head.uri); + let url = Url::parse(&s1)?; + Self::from_url(&url) + } + pub fn range(&self) -> &NanoRange { &self.range } @@ -147,6 +160,7 @@ impl PlainEventsJsonQuery { self.timeout = k; } + // TODO remove in favor of Self::append_to_url pub fn url(&self, host: &HostPort) -> String { let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; format!( @@ -160,4 +174,44 @@ impl PlainEventsJsonQuery { self.timeout.as_millis(), ) } + + pub fn append_to_url(&self, url: &mut Url) { + let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; + let mut g = url.query_pairs_mut(); + g.append_pair("channelBackend", &self.channel.backend); + g.append_pair("channelName", &self.channel.name); + g.append_pair( + "begDate", + &Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(), + ); + g.append_pair( + "endDate", + &Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(), + ); + g.append_pair("timeout", &format!("{}", self.timeout.as_millis())); + } +} + +impl HasBackend for PlainEventsJsonQuery { + fn backend(&self) -> &str { + &self.channel.backend + } +} + +impl HasTimeout for PlainEventsJsonQuery { + fn timeout(&self) -> Duration { + self.timeout.clone() + } +} + +impl FromUrl for PlainEventsJsonQuery { + fn from_url(url: &Url) -> Result { + Self::from_url(url) + } +} + +impl AppendToUrl for PlainEventsJsonQuery { + fn append_to_url(&self, url: &mut Url) { + self.append_to_url(url) + } } diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 1836c6b..54c0a9d 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -34,7 +34,6 @@ pub mod gen; pub mod index; pub mod merge; pub mod paths; -pub mod query; pub mod raw; pub mod streamlog; diff --git a/disk/src/merge/mergedfromremotes.rs b/disk/src/merge/mergedfromremotes.rs index ef2e8d1..4019910 100644 --- a/disk/src/merge/mergedfromremotes.rs +++ b/disk/src/merge/mergedfromremotes.rs @@ -2,7 +2,7 @@ use crate::agg::streams::Appendable; use crate::binned::{EventsNodeProcessor, PushableIndex}; use crate::frame::makeframe::FrameType; use crate::merge::MergedStream; -use crate::raw::{x_processed_stream_from_node, EventsQuery}; +use crate::raw::{x_processed_stream_from_node, RawEventsQuery}; use crate::Sitemty; use err::Error; use futures_core::Stream; @@ -34,7 +34,7 @@ where ::Output: Unpin, Sitemty<::Output>: FrameType, { - pub fn new(evq: EventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self { + pub fn new(evq: RawEventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self { info!("MergedFromRemotes evq {:?}", evq); let mut tcp_establish_futs = vec![]; for node in &cluster.nodes { diff --git a/disk/src/query.rs b/disk/src/query.rs deleted file mode 100644 index 45a4eec..0000000 --- a/disk/src/query.rs +++ /dev/null @@ -1,17 +0,0 @@ -use err::Error; -use netpod::Channel; -use std::collections::BTreeMap; - -pub fn channel_from_params(params: &BTreeMap) -> Result { - let ret = Channel { - backend: params - .get("channelBackend") - .ok_or(Error::with_msg("missing channelBackend"))? - .into(), - name: params - .get("channelName") - .ok_or(Error::with_msg("missing channelName"))? - .into(), - }; - Ok(ret) -} diff --git a/disk/src/raw.rs b/disk/src/raw.rs index 3ea1bcf..eea6fff 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -13,7 +13,6 @@ use crate::raw::eventsfromframes::EventsFromFrames; use crate::Sitemty; use err::Error; use futures_core::Stream; -use netpod::log::*; use netpod::{AggKind, Channel, NanoRange, Node, PerfOpts}; use serde::{Deserialize, Serialize}; use std::pin::Pin; @@ -27,7 +26,7 @@ pub mod eventsfromframes; Query parameters to request (optionally) X-processed, but not T-processed events. */ #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct EventsQuery { +pub struct RawEventsQuery { pub channel: Channel, pub range: NanoRange, pub agg_kind: AggKind, @@ -37,7 +36,7 @@ pub struct EventsQuery { pub struct EventQueryJsonStringFrame(String); pub async fn x_processed_stream_from_node( - query: EventsQuery, + query: RawEventsQuery, perf_opts: PerfOpts, node: Node, ) -> Result::Output>> + Send>>, Error> @@ -48,7 +47,6 @@ where { let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; let qjs = serde_json::to_string(&query)?; - info!("x_processed_stream_from_node qjs {:?}", qjs); let (netin, mut netout) = net.into_split(); let buf = make_frame(&EventQueryJsonStringFrame(qjs))?; netout.write_all(&buf).await?; diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index e88f48c..2f5309d 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -9,7 +9,7 @@ use crate::eventblobs::EventBlobsComplete; use crate::eventchunker::EventChunkerConf; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::{decode_frame, make_frame, make_term_frame, Framable}; -use crate::raw::{EventQueryJsonStringFrame, EventsQuery}; +use crate::raw::{EventQueryJsonStringFrame, RawEventsQuery}; use crate::Sitemty; use err::Error; use futures_core::Stream; @@ -238,7 +238,7 @@ async fn events_conn_handler_inner_try( Ok(k) => k, Err(e) => return Err((e, netout).into()), }; - let res: Result = serde_json::from_str(&qitem.0); + let res: Result = serde_json::from_str(&qitem.0); let evq = match res { Ok(k) => k, Err(e) => { diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index 59bd744..8aa5ffb 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -11,11 +11,13 @@ http = "0.2" url = "2.2" tokio = { version = "1.5.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] } +hyper-tls = { version="0.5.0" } bytes = "1.0.1" futures-core = "0.3.14" futures-util = "0.3.14" tracing = "0.1.25" async-channel = "1.6" +itertools = "0.10.1" err = { path = "../err" } netpod = { path = "../netpod" } dbconn = { path = "../dbconn" } diff --git a/httpret/src/gather.rs b/httpret/src/gather.rs index a9f99b7..2a739e7 100644 --- a/httpret/src/gather.rs +++ b/httpret/src/gather.rs @@ -3,6 +3,7 @@ use err::Error; use futures_util::{select, FutureExt}; use http::{Method, StatusCode}; use hyper::{Body, Client, Request, Response}; +use hyper_tls::HttpsConnector; use netpod::{Node, NodeConfigCached}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; @@ -176,7 +177,9 @@ where let spawned: Vec<_> = urls .into_iter() .map(move |url| { - let req = Request::builder().method(method.clone()).uri(url.as_str()); + let url_str = url.as_str(); + let is_tls = if url_str.starts_with("https://") { true } else { false }; + let req = Request::builder().method(method.clone()).uri(url_str); //let req = req.header("x-log-from-node-name", format!("{}", node_config.node_config.name)); let req = req.header(http::header::ACCEPT, "application/json"); let req = req.body(Body::empty()); @@ -185,7 +188,17 @@ where _ = sleep(timeout).fuse() => { Err(Error::with_msg("timeout")) } - res = Client::new().request(req?).fuse() => Ok(nt(res?).await?) + res = { + if is_tls { + let https = HttpsConnector::new(); + let client = Client::builder().build::<_, hyper::Body>(https); + client.request(req?).fuse() + } + else { + let client = Client::new(); + client.request(req?).fuse() + } + } => Ok(nt(res?).await?) } }); (url, task) diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 3838b36..b26b51a 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -2,7 +2,7 @@ use crate::gather::gather_get_json; use bytes::Bytes; use disk::binned::prebinned::pre_binned_bytes_for_http; use disk::binned::query::{BinnedQuery, PreBinnedQuery}; -use disk::events::{PlainEventsJsonQuery, PlainEventsQuery}; +use disk::events::{PlainEventsBinaryQuery, PlainEventsJsonQuery}; use disk::raw::conn::events_service; use err::Error; use future::Future; @@ -13,7 +13,7 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{server::Server, Body, Request, Response}; use net::SocketAddr; use netpod::log::*; -use netpod::{AggKind, Channel, NodeConfigCached}; +use netpod::{AggKind, Channel, FromUrl, NodeConfigCached}; use panic::{AssertUnwindSafe, UnwindSafe}; use pin::Pin; use serde::{Deserialize, Serialize}; @@ -21,6 +21,7 @@ use std::{future, net, panic, pin, task}; use task::{Context, Poll}; use tracing::field::Empty; use tracing::Instrument; +use url::Url; pub mod api1; pub mod gather; @@ -150,15 +151,9 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } - } else if path == "/api/4/table_sizes" { + } else if path == "/api/4/events" { if req.method() == Method::GET { - Ok(table_sizes(req, &node_config).await?) - } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) - } - } else if path == "/api/4/random_channel" { - if req.method() == Method::GET { - Ok(random_channel(req, &node_config).await?) + Ok(plain_events(req, &node_config).await?) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } @@ -174,9 +169,15 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } - } else if path == "/api/4/events" { + } else if path == "/api/4/table_sizes" { if req.method() == Method::GET { - Ok(plain_events(req, &node_config).await?) + Ok(table_sizes(req, &node_config).await?) + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } else if path == "/api/4/random_channel" { + if req.method() == Method::GET { + Ok(random_channel(req, &node_config).await?) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } @@ -331,7 +332,8 @@ where async fn binned(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let (head, _body) = req.into_parts(); - let query = BinnedQuery::from_request(&head)?; + let url = Url::parse(&format!("dummy:{}", head.uri))?; + let query = BinnedQuery::from_url(&url)?; match head.headers.get("accept") { Some(v) if v == "application/octet-stream" => binned_binary(query, node_config).await, Some(v) if v == "application/json" => binned_json(query, node_config).await, @@ -398,6 +400,7 @@ async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result } async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + info!("httpret plain_events headers: {:?}", req.headers()); let accept_def = ""; let accept = req .headers() @@ -414,7 +417,7 @@ async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Res async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let (head, _body) = req.into_parts(); - let query = PlainEventsQuery::from_request(&head)?; + let query = PlainEventsBinaryQuery::from_request(&head)?; let op = disk::channelexec::PlainEvents::new(query.channel().clone(), query.range().clone(), node_config.clone()); let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?; let s = s.map(|item| item.make_frame()); @@ -424,7 +427,7 @@ async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) async fn plain_events_json(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let (head, _body) = req.into_parts(); - let query = PlainEventsJsonQuery::from_request(&head)?; + let query = PlainEventsJsonQuery::from_request_head(&head)?; let op = disk::channelexec::PlainEventsJson::new( query.channel().clone(), query.range().clone(), diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index dbb200e..094b64e 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -1,12 +1,20 @@ use crate::api1::{channels_config_v1, channels_list_v1, gather_json_2_v1, gather_json_v1, proxy_distribute_v1}; use crate::gather::gather_get_json_generic; use crate::{proxy_mark, response, Cont}; +use disk::binned::query::BinnedQuery; +use disk::events::PlainEventsJsonQuery; use err::Error; use http::{HeaderValue, StatusCode}; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, Server}; +use itertools::Itertools; use netpod::log::*; -use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig}; +use netpod::{ + AppendToUrl, ChannelConfigQuery, ChannelSearchQuery, ChannelSearchResult, FromUrl, HasBackend, HasTimeout, + ProxyConfig, +}; +use serde::{Deserialize, Serialize}; +use serde_json::Value as JsonValue; use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; @@ -52,12 +60,20 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) Ok(channels_config_v1(req, proxy_config).await?) } else if path == "/api/1/stats/version" { Ok(gather_json_v1(req, "/stats/version").await?) - } else if path.starts_with("/api/1/stats/") { + } else if path == "/api/1/stats/" { Ok(gather_json_v1(req, path).await?) } else if path.starts_with("/api/1/gather/") { Ok(gather_json_2_v1(req, "/api/1/gather/", proxy_config).await?) - } else if path.starts_with("/api/4/search/channel") { + } else if path == "/api/4/backends" { + Ok(backends(req, proxy_config).await?) + } else if path == "/api/4/search/channel" { Ok(channel_search(req, proxy_config).await?) + } else if path == "/api/4/events" { + Ok(proxy_single_backend_query::(req, proxy_config).await?) + } else if path == "/api/4/binned" { + Ok(proxy_single_backend_query::(req, proxy_config).await?) + } else if path == "/api/4/channel/config" { + Ok(proxy_single_backend_query::(req, proxy_config).await?) } else if path.starts_with("/distribute") { proxy_distribute_v1(req).await } else { @@ -70,13 +86,25 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) } } +#[derive(Serialize, Deserialize)] +pub struct BackendsResponse { + backends: Vec, +} + +pub async fn backends(_req: Request, proxy_config: &ProxyConfig) -> Result, Error> { + let backends: Vec<_> = proxy_config.backends.iter().map(|k| k.name.to_string()).collect(); + let res = BackendsResponse { backends }; + let ret = response(StatusCode::OK).body(Body::from(serde_json::to_vec(&res)?))?; + Ok(ret) +} + pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> Result, Error> { let (head, _body) = req.into_parts(); match head.headers.get("accept") { Some(v) => { if v == "application/json" { let query = ChannelSearchQuery::from_query_string(head.uri.query())?; - let urls: Vec> = proxy_config + let urls = proxy_config .search_hosts .iter() .map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh)) { @@ -86,18 +114,10 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R } Err(_e) => Err(Error::with_msg(format!("parse error for: {:?}", sh))), }) - .collect(); - for u in &urls { - match u { - Ok(url) => { - info!("URL: {}", url.as_str()); - } - Err(_) => { - return Err(Error::with_msg("url parse error")); - } - } - } - let urls: Vec<_> = urls.into_iter().map(Result::unwrap).collect(); + .fold_ok(vec![], |mut a, x| { + a.push(x); + a + })?; let nt = |res| { let fut = async { let body = hyper::body::to_bytes(res).await?; @@ -136,62 +156,54 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R } } -pub async fn events(req: Request, proxy_config: &ProxyConfig) -> Result, Error> { +pub async fn proxy_single_backend_query( + req: Request, + proxy_config: &ProxyConfig, +) -> Result, Error> +where + QT: FromUrl + HasBackend + AppendToUrl + HasTimeout, +{ let (head, _body) = req.into_parts(); match head.headers.get("accept") { Some(v) => { if v == "application/json" { - Url::parse(&format!("{}", head.uri))?; - let query = ChannelSearchQuery::from_query_string(head.uri.query())?; - let urls: Vec> = proxy_config - .search_hosts + let url = Url::parse(&format!("dummy:{}", head.uri))?; + let query = QT::from_url(&url)?; + let sh = get_query_host_for_backend(&query.backend(), proxy_config)?; + let urls = [sh] .iter() - .map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh)) { + .map(|sh| match Url::parse(&format!("{}{}", sh, head.uri.path())) { Ok(mut url) => { query.append_to_url(&mut url); Ok(url) } - Err(_e) => Err(Error::with_msg(format!("parse error for: {:?}", sh))), + Err(e) => Err(Error::with_msg(format!("parse error for: {:?} {:?}", sh, e))), }) - .collect(); - for u in &urls { - match u { - Ok(url) => { - info!("URL: {}", url.as_str()); - } - Err(_) => { - return Err(Error::with_msg("url parse error")); - } - } - } - let urls: Vec<_> = urls.into_iter().map(Result::unwrap).collect(); + .fold_ok(vec![], |mut a, x| { + a.push(x); + a + })?; let nt = |res| { let fut = async { let body = hyper::body::to_bytes(res).await?; - info!("got a result {:?}", body); - let res: ChannelSearchResult = match serde_json::from_slice(&body) { - Ok(k) => k, - Err(_) => ChannelSearchResult { channels: vec![] }, - }; - Ok(res) + match serde_json::from_slice::(&body) { + Ok(k) => Ok(k), + Err(e) => Err(e.into()), + } }; Box::pin(fut) as Pin + Send>> }; - let ft = |all: Vec| { - let mut res = vec![]; - for j in all { - for k in j.channels { - res.push(k); - } - } - let res = ChannelSearchResult { channels: res }; + let ft = |all: Vec| { + let res = match all.first() { + Some(item) => Ok(item), + None => Err(Error::with_msg("no response from upstream")), + }?; let res = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, "application/json") - .body(Body::from(serde_json::to_string(&res)?))?; + .body(Body::from(serde_json::to_string(res)?))?; Ok(res) }; - let mut ret = - gather_get_json_generic(http::Method::GET, urls, nt, ft, Duration::from_millis(3000)).await?; + let mut ret = gather_get_json_generic(http::Method::GET, urls, nt, ft, query.timeout()).await?; ret.headers_mut() .append("x-proxy-log-mark", HeaderValue::from_str(proxy_mark())?); Ok(ret) @@ -202,3 +214,12 @@ pub async fn events(req: Request, proxy_config: &ProxyConfig) -> Result Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?), } } + +fn get_query_host_for_backend(backend: &str, proxy_config: &ProxyConfig) -> Result { + for back in &proxy_config.backends { + if back.name == backend { + return Ok(back.url.clone()); + } + } + return Err(Error::with_msg(format!("host not found for backend {:?}", backend))); +} diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 868c964..38a759b 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -5,10 +5,12 @@ use futures_util::StreamExt; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::fmt::{Debug, Display, Formatter}; +use std::iter::FromIterator; use std::path::PathBuf; use std::pin::Pin; use std::str::FromStr; use std::task::{Context, Poll}; +use std::time::Duration; use timeunits::*; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -847,6 +849,20 @@ impl ByteSize { } } +pub fn channel_from_pairs(pairs: &BTreeMap) -> Result { + let ret = Channel { + backend: pairs + .get("channelBackend") + .ok_or(Error::with_msg("missing channelBackend"))? + .into(), + name: pairs + .get("channelName") + .ok_or(Error::with_msg("missing channelName"))? + .into(), + }; + Ok(ret) +} + #[derive(Serialize, Deserialize)] pub struct ChannelSearchQuery { pub name_regex: String, @@ -922,8 +938,10 @@ pub struct ChannelSearchResult { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ProxyBackend { pub name: String, + // TODO remove host+port from struct and config json, keep only url: pub host: String, pub port: u16, + pub url: String, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -933,3 +951,58 @@ pub struct ProxyConfig { pub search_hosts: Vec, pub backends: Vec, } + +pub trait HasBackend { + fn backend(&self) -> &str; +} + +pub trait HasTimeout { + fn timeout(&self) -> Duration; +} + +pub trait FromUrl: Sized { + fn from_url(url: &Url) -> Result; +} + +pub trait AppendToUrl { + fn append_to_url(&self, url: &mut Url); +} + +pub fn get_url_query_pairs(url: &Url) -> BTreeMap { + BTreeMap::from_iter(url.query_pairs().map(|(j, k)| (j.to_string(), k.to_string()))) +} + +#[derive(Serialize, Deserialize)] +pub struct ChannelConfigQuery { + pub channel: Channel, +} + +impl HasBackend for ChannelConfigQuery { + fn backend(&self) -> &str { + &self.channel.backend + } +} + +impl HasTimeout for ChannelConfigQuery { + fn timeout(&self) -> Duration { + Duration::from_millis(2000) + } +} + +impl FromUrl for ChannelConfigQuery { + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + let ret = Self { + channel: channel_from_pairs(&pairs)?, + }; + Ok(ret) + } +} + +impl AppendToUrl for ChannelConfigQuery { + fn append_to_url(&self, url: &mut Url) { + let mut g = url.query_pairs_mut(); + g.append_pair("channelBackend", &self.channel.backend); + g.append_pair("channelName", &self.channel.name); + } +}