From c1fc53c22ea5c21bc247e5ce8406f0a27d249e37 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 28 Apr 2021 14:59:18 +0200 Subject: [PATCH] Remove Arc from config structs to make them Serialize --- dbconn/src/lib.rs | 3 +-- disk/src/agg.rs | 2 +- disk/src/aggtest.rs | 4 ---- disk/src/binnedstream.rs | 8 +++---- disk/src/cache.rs | 19 ++++++++------- disk/src/cache/pbv.rs | 18 +++++++------- disk/src/cache/pbvfs.rs | 3 +-- disk/src/channelconfig.rs | 8 +++---- disk/src/dataopen.rs | 5 ++-- disk/src/eventblobs.rs | 9 ++++--- disk/src/gen.rs | 2 +- disk/src/lib.rs | 11 ++++----- disk/src/raw.rs | 7 +++--- disk/src/raw/conn.rs | 44 ++++++++++++++++++---------------- httpret/src/lib.rs | 44 ++++++++++++++++++---------------- netpod/src/lib.rs | 28 +++++++++++++++------- retrieval/src/bin/retrieval.rs | 27 +++++++++++++-------- retrieval/src/cli.rs | 5 +++- retrieval/src/lib.rs | 14 +++++++---- retrieval/src/test.rs | 24 ++++++++----------- 20 files changed, 148 insertions(+), 137 deletions(-) diff --git a/dbconn/src/lib.rs b/dbconn/src/lib.rs index 1df4537..799ccfb 100644 --- a/dbconn/src/lib.rs +++ b/dbconn/src/lib.rs @@ -1,10 +1,9 @@ use err::Error; use netpod::log::*; use netpod::{Channel, NodeConfig}; -use std::sync::Arc; use tokio_postgres::NoTls; -pub async fn channel_exists(channel: &Channel, node_config: Arc) -> Result { +pub async fn channel_exists(channel: &Channel, node_config: &NodeConfig) -> Result { let d = &node_config.cluster.database; let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name); let (cl, conn) = tokio_postgres::connect(&uri, NoTls).await?; diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 8742c02..f8a3326 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -442,7 +442,7 @@ where pub fn make_test_node(id: u32) -> Node { Node { - id, + id: format!("{:02}", id), host: "localhost".into(), listen: "0.0.0.0".into(), port: 8800 + id as u16, diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 93ee932..641730c 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -7,7 +7,6 @@ use futures_util::StreamExt; use netpod::timeunits::*; use netpod::{BinSpecDimT, Channel, ChannelConfig, NanoRange, ScalarType, Shape}; use std::future::ready; -use std::sync::Arc; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -22,7 +21,6 @@ fn agg_x_dim_0() { async fn agg_x_dim_0_inner() { let node = make_test_node(0); - let node = Arc::new(node); let query = netpod::AggQuerySingleChannel { channel_config: ChannelConfig { channel: Channel { @@ -93,7 +91,6 @@ async fn agg_x_dim_1_inner() { // /data/sf-databuffer/daq_swissfel/daq_swissfel_3/byTime/S10BC01-DBAM070\:BAM_CH1_NORM/* // S10BC01-DBAM070:BAM_CH1_NORM let node = make_test_node(0); - let node = Arc::new(node); let query = netpod::AggQuerySingleChannel { channel_config: ChannelConfig { channel: Channel { @@ -179,7 +176,6 @@ async fn merge_0_inner() { .into_iter() .map(|k| make_test_node(k)) .map(|node| { - let node = Arc::new(node); super::eventblobs::EventBlobsComplete::new( range.clone(), query.channel_config.clone(), diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index 39d0124..99325c9 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -9,7 +9,6 @@ use netpod::{AggKind, Channel, NodeConfig, PreBinnedPatchIterator}; use netpod::{NanoRange, RetStreamExt}; use std::future::ready; use std::pin::Pin; -use std::sync::Arc; use std::task::{Context, Poll}; pub struct BinnedStream { @@ -22,7 +21,7 @@ impl BinnedStream { channel: Channel, range: NanoRange, agg_kind: AggKind, - node_config: Arc, + node_config: &NodeConfig, ) -> Self { let patches: Vec<_> = patch_it.collect(); warn!("BinnedStream will open a PreBinnedValueStream"); @@ -30,8 +29,9 @@ impl BinnedStream { info!("BinnedStream -> patch {:?}", p); } let inp = futures_util::stream::iter(patches.into_iter()) - .map(move |coord| { - PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), node_config.clone()) + .map({ + let node_config = node_config.clone(); + move |coord| PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), &node_config) }) .flatten() .only_first_error() diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 15bf5a0..1cff45a 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -12,13 +12,12 @@ use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; use hyper::Response; use netpod::{ - AggKind, Channel, Cluster, NanoRange, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange, - ToNanos, + AggKind, Channel, Cluster, NanoRange, Node, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator, + PreBinnedPatchRange, ToNanos, }; use serde::{Deserialize, Serialize}; use std::future::Future; use std::pin::Pin; -use std::sync::Arc; use std::task::{Context, Poll}; use tiny_keccak::Hasher; use tokio::io::{AsyncRead, ReadBuf}; @@ -66,11 +65,12 @@ impl Query { } pub async fn binned_bytes_for_http( - node_config: Arc, + node_config: &NodeConfig, + node: &Node, query: &Query, ) -> Result { let range = &query.range; - let channel_config = read_local_config(&query.channel, node_config.clone()).await?; + let channel_config = read_local_config(&query.channel, node).await?; let entry = extract_matching_config_entry(range, &channel_config); info!("found config entry {:?}", entry); let grid = PreBinnedPatchRange::covering_range(query.range.clone(), query.count); @@ -83,7 +83,7 @@ pub async fn binned_bytes_for_http( query.channel.clone(), query.range.clone(), query.agg_kind.clone(), - node_config.clone(), + node_config, ); let ret = BinnedBytesForHttpStream::new(s1); Ok(ret) @@ -170,10 +170,11 @@ impl PreBinnedQuery { // A user must first make sure that the grid spec is valid, and that this node is responsible for it. // Otherwise it is an error. pub fn pre_binned_bytes_for_http( - node_config: Arc, + node_config: &NodeConfig, + node: &Node, query: &PreBinnedQuery, ) -> Result { - info!("pre_binned_bytes_for_http {:?} {:?}", query, node_config.node); + info!("pre_binned_bytes_for_http {:?} {:?}", query, node); let ret = PreBinnedValueByteStream::new( query.patch.clone(), query.channel.clone(), @@ -254,7 +255,7 @@ pub struct MergedFromRemotes { } impl MergedFromRemotes { - pub fn new(evq: Arc, cluster: Arc) -> Self { + pub fn new(evq: EventsQuery, cluster: Cluster) -> Self { let mut tcp_establish_futs = vec![]; for node in &cluster.nodes { let f = super::raw::x_processed_stream_from_node(evq.clone(), node.clone()); diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index f8bcd71..c67cda0 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -15,7 +15,6 @@ use netpod::{ }; use std::future::{ready, Future}; use std::pin::Pin; -use std::sync::Arc; use std::task::{Context, Poll}; pub struct PreBinnedValueByteStream { @@ -25,7 +24,7 @@ pub struct PreBinnedValueByteStream { } impl PreBinnedValueByteStream { - pub fn new(patch: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, node_config: Arc) -> Self { + pub fn new(patch: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, node_config: &NodeConfig) -> Self { Self { inp: PreBinnedValueStream::new(patch, channel, agg_kind, node_config), errored: false, @@ -64,7 +63,7 @@ pub struct PreBinnedValueStream { patch_coord: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, - node_config: Arc, + node_config: NodeConfig, open_check_local_file: Option> + Send>>>, fut2: Option> + Send>>>, } @@ -74,15 +73,15 @@ impl PreBinnedValueStream { patch_coord: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, - node_config: Arc, + node_config: &NodeConfig, ) -> Self { - let node_ix = node_ix_for_patch(&patch_coord, &channel, &node_config.cluster); - assert!(node_ix == node_config.node.id); + // TODO check that we are the correct node. + let _node_ix = node_ix_for_patch(&patch_coord, &channel, &node_config.cluster); Self { patch_coord, channel, agg_kind, - node_config, + node_config: node_config.clone(), open_check_local_file: None, fut2: None, } @@ -107,7 +106,7 @@ impl PreBinnedValueStream { range ); assert!(g / h > 1); - assert!(g / h < 20); + assert!(g / h < 200); assert!(g % h == 0); let bin_size = range.grid_spec.bin_t_len(); let channel = self.channel.clone(); @@ -116,7 +115,7 @@ impl PreBinnedValueStream { let patch_it = PreBinnedPatchIterator::from_range(range); let s = futures_util::stream::iter(patch_it) .map(move |coord| { - PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), node_config.clone()) + PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), &node_config) }) .flatten() .map(move |k| { @@ -147,7 +146,6 @@ impl PreBinnedValueStream { ts2: self.patch_coord.patch_end(), count, }; - let evq = Arc::new(evq); let s1 = MergedFromRemotes::new(evq, self.node_config.cluster.clone()); let s2 = s1 .map(|k| { diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index f2fcc4e..db88108 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -9,7 +9,6 @@ use futures_util::{pin_mut, FutureExt}; use netpod::log::*; use netpod::{AggKind, Channel, NodeConfig, PreBinnedPatchCoord}; use std::pin::Pin; -use std::sync::Arc; use std::task::{Context, Poll}; pub struct PreBinnedValueFetchedStream { @@ -23,7 +22,7 @@ impl PreBinnedValueFetchedStream { patch_coord: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, - node_config: Arc, + node_config: &NodeConfig, ) -> Self { let nodeix = node_ix_for_patch(&patch_coord, &channel, &node_config.cluster); let node = &node_config.cluster.nodes[nodeix as usize]; diff --git a/disk/src/channelconfig.rs b/disk/src/channelconfig.rs index b144e37..43c65e5 100644 --- a/disk/src/channelconfig.rs +++ b/disk/src/channelconfig.rs @@ -1,5 +1,5 @@ use err::Error; -use netpod::{Channel, NanoRange, NodeConfig}; +use netpod::{Channel, NanoRange, Node}; use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8}; use nom::Needed; #[allow(unused_imports)] @@ -11,7 +11,6 @@ use nom::{ use num_derive::{FromPrimitive, ToPrimitive}; use num_traits::ToPrimitive; use serde::{Deserialize, Serialize}; -use std::sync::Arc; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -264,9 +263,8 @@ pub fn parse_config(inp: &[u8]) -> NRes { Ok((inp, ret)) } -pub async fn read_local_config(channel: &Channel, node_config: Arc) -> Result { - let path = node_config - .node +pub async fn read_local_config(channel: &Channel, node: &Node) -> Result { + let path = node .data_base_path .join("config") .join(&channel.name) diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index 79831a5..56597f2 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -5,14 +5,13 @@ use futures_util::StreamExt; use netpod::log::*; use netpod::{ChannelConfig, NanoRange, Nanos, Node}; use std::mem::size_of; -use std::sync::Arc; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncReadExt, AsyncSeekExt, ErrorKind, SeekFrom}; pub fn open_files( range: &NanoRange, channel_config: &ChannelConfig, - node: Arc, + node: Node, ) -> async_channel::Receiver> { let (chtx, chrx) = async_channel::bounded(2); let range = range.clone(); @@ -35,7 +34,7 @@ async fn open_files_inner( chtx: &async_channel::Sender>, range: &NanoRange, channel_config: &ChannelConfig, - node: Arc, + node: Node, ) -> Result<(), Error> { let channel_config = channel_config.clone(); // TODO reduce usage of `query` and see what we actually need. diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 2f92a03..19ddb52 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -6,7 +6,6 @@ use futures_core::Stream; use futures_util::StreamExt; use netpod::{ChannelConfig, NanoRange, Node}; use std::pin::Pin; -use std::sync::Arc; use std::task::{Context, Poll}; use tokio::fs::File; @@ -19,9 +18,9 @@ pub struct EventBlobsComplete { } impl EventBlobsComplete { - pub fn new(range: NanoRange, channel_config: ChannelConfig, node: Arc, buffer_size: usize) -> Self { + pub fn new(range: NanoRange, channel_config: ChannelConfig, node: Node, buffer_size: usize) -> Self { Self { - file_chan: open_files(&range, &channel_config, node.clone()), + file_chan: open_files(&range, &channel_config, node), evs: None, buffer_size, channel_config, @@ -67,12 +66,12 @@ impl Stream for EventBlobsComplete { pub fn event_blobs_complete( query: &netpod::AggQuerySingleChannel, - node: Arc, + node: Node, ) -> impl Stream> + Send { let query = query.clone(); let node = node.clone(); async_stream::stream! { - let filerx = open_files(err::todoval(), err::todoval(), node.clone()); + let filerx = open_files(err::todoval(), err::todoval(), node); while let Ok(fileres) = filerx.recv().await { match fileres { Ok(file) => { diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 781de5a..eef12d0 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -38,7 +38,7 @@ pub async fn gen_test_data() -> Result<(), Error> { } for i1 in 0..3 { let node = Node { - id: i1, + id: format!("{:02}", i1), host: "localhost".into(), listen: "0.0.0.0".into(), port: 7780 + i1 as u16, diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 9b482eb..a2d6a72 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -270,14 +270,14 @@ pub fn raw_concat_channel_read_stream_try_open_in_background( pub fn raw_concat_channel_read_stream_file_pipe( range: &NanoRange, channel_config: &ChannelConfig, - node: Arc, + node: Node, buffer_size: usize, ) -> impl Stream> + Send { let range = range.clone(); let channel_config = channel_config.clone(); let node = node.clone(); async_stream::stream! { - let chrx = open_files(&range, &channel_config, node.clone()); + let chrx = open_files(&range, &channel_config, node); while let Ok(file) = chrx.recv().await { let mut file = match file { Ok(k) => k, @@ -319,14 +319,11 @@ pub fn file_content_stream( } } -pub fn parsed1( - query: &netpod::AggQuerySingleChannel, - node: Arc, -) -> impl Stream> + Send { +pub fn parsed1(query: &netpod::AggQuerySingleChannel, node: &Node) -> impl Stream> + Send { let query = query.clone(); let node = node.clone(); async_stream::stream! { - let filerx = open_files(err::todoval(), err::todoval(), node.clone()); + let filerx = open_files(err::todoval(), err::todoval(), node); while let Ok(fileres) = filerx.recv().await { match fileres { Ok(file) => { diff --git a/disk/src/raw.rs b/disk/src/raw.rs index 163dbfb..da8fd8c 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -14,7 +14,6 @@ use futures_core::Stream; use netpod::{AggKind, Channel, NanoRange, Node}; use serde::{Deserialize, Serialize}; use std::pin::Pin; -use std::sync::Arc; use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; #[allow(unused_imports)] @@ -37,11 +36,11 @@ pub struct EventsQuery { pub struct EventQueryJsonStringFrame(String); pub async fn x_processed_stream_from_node( - query: Arc, - node: Arc, + query: EventsQuery, + node: Node, ) -> Result> + Send>>, Error> { let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; - let qjs = serde_json::to_string(query.as_ref())?; + let qjs = serde_json::to_string(&query)?; let (netin, mut netout) = net.into_split(); let buf = make_frame(&EventQueryJsonStringFrame(qjs))?; netout.write_all(&buf).await?; diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index a636a17..e080471 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -11,31 +11,36 @@ use futures_util::StreamExt; #[allow(unused_imports)] use netpod::log::*; use netpod::timeunits::SEC; -use netpod::{NodeConfig, ScalarType, Shape}; +use netpod::{Node, NodeConfig, ScalarType, Shape}; use std::net::SocketAddr; -use std::sync::Arc; use tokio::io::AsyncWriteExt; use tokio::net::tcp::OwnedWriteHalf; use tokio::net::TcpStream; use tracing::Instrument; -pub async fn raw_service(node_config: Arc) -> Result<(), Error> { - let addr = format!("{}:{}", node_config.node.listen, node_config.node.port_raw); +pub async fn raw_service(node_config: NodeConfig, node: Node) -> Result<(), Error> { + let addr = format!("{}:{}", node.listen, node.port_raw); let lis = tokio::net::TcpListener::bind(addr).await?; loop { match lis.accept().await { Ok((stream, addr)) => { - taskrun::spawn(raw_conn_handler(stream, addr, node_config.clone())); + let node = node.clone(); + taskrun::spawn(raw_conn_handler(stream, addr, node_config.clone(), node)); } Err(e) => Err(e)?, } } } -async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr, node_config: Arc) -> Result<(), Error> { +async fn raw_conn_handler( + stream: TcpStream, + addr: SocketAddr, + node_config: NodeConfig, + node: Node, +) -> Result<(), Error> { //use tracing_futures::Instrument; let span1 = span!(Level::INFO, "raw::raw_conn_handler"); - let r = raw_conn_handler_inner(stream, addr, node_config) + let r = raw_conn_handler_inner(stream, addr, &node_config, &node) .instrument(span1) .await; match r { @@ -52,9 +57,10 @@ pub type RawConnOut = Result; async fn raw_conn_handler_inner( stream: TcpStream, addr: SocketAddr, - node_config: Arc, + node_config: &NodeConfig, + node: &Node, ) -> Result<(), Error> { - match raw_conn_handler_inner_try(stream, addr, node_config).await { + match raw_conn_handler_inner_try(stream, addr, node_config, node).await { Ok(_) => (), Err(mut ce) => { /*error!( @@ -88,7 +94,8 @@ impl> From<(E, OwnedWriteHalf)> for ConnErr { async fn raw_conn_handler_inner_try( stream: TcpStream, addr: SocketAddr, - node_config: Arc, + node_config: &NodeConfig, + node: &Node, ) -> Result<(), ConnErr> { info!("raw_conn_handler SPAWNED for {:?}", addr); let (netin, mut netout) = stream.into_split(); @@ -126,13 +133,13 @@ async fn raw_conn_handler_inner_try( return Err((Error::with_msg("can not parse request json"), netout))?; } }; - match dbconn::channel_exists(&evq.channel, node_config.clone()).await { + match dbconn::channel_exists(&evq.channel, &node_config).await { Ok(_) => (), Err(e) => return Err((e, netout))?, } debug!("REQUEST {:?}", evq); let range = &evq.range; - let channel_config = match read_local_config(&evq.channel, node_config.clone()).await { + let channel_config = match read_local_config(&evq.channel, node).await { Ok(k) => k, Err(e) => return Err((e, netout))?, }; @@ -173,21 +180,16 @@ async fn raw_conn_handler_inner_try( buffer_size: 1024 * 4, }; let buffer_size = 1024 * 4; - let mut s1 = EventBlobsComplete::new( - range.clone(), - query.channel_config.clone(), - node_config.node.clone(), - buffer_size, - ) - .into_dim_1_f32_stream() - .into_binned_x_bins_1(); + let mut s1 = EventBlobsComplete::new(range.clone(), query.channel_config.clone(), node.clone(), buffer_size) + .into_dim_1_f32_stream() + .into_binned_x_bins_1(); let mut e = 0; while let Some(item) = s1.next().await { if let Ok(k) = &item { e += 1; trace!( "emit items sp {:2} e {:3} len {:3} {:10?} {:10?}", - node_config.node.split, + node.split, e, k.tss.len(), k.tss.first().map(|k| k / SEC), diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 5dbcd80..cb6de14 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -9,28 +9,30 @@ use http::{HeaderMap, Method, StatusCode}; use hyper::service::{make_service_fn, service_fn}; use hyper::{server::Server, Body, Request, Response}; use net::SocketAddr; -use netpod::NodeConfig; +use netpod::{Node, NodeConfig}; use panic::{AssertUnwindSafe, UnwindSafe}; use pin::Pin; -use std::{future, net, panic, pin, sync, task}; -use sync::Arc; +use std::{future, net, panic, pin, task}; use task::{Context, Poll}; use tracing::field::Empty; #[allow(unused_imports)] use tracing::{debug, error, info, span, trace, warn, Level}; -pub async fn host(node_config: Arc) -> Result<(), Error> { - let rawjh = taskrun::spawn(raw_service(node_config.clone())); +pub async fn host(node_config: NodeConfig, node: Node) -> Result<(), Error> { + let node_config = node_config.clone(); + let node = node.clone(); + let rawjh = taskrun::spawn(raw_service(node_config.clone(), node.clone())); use std::str::FromStr; - let addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen, node_config.node.port))?; + let addr = SocketAddr::from_str(&format!("{}:{}", node.listen, node.port))?; let make_service = make_service_fn({ move |conn| { info!("new raw {:?}", conn); let node_config = node_config.clone(); + let node = node.clone(); async move { Ok::<_, Error>(service_fn({ move |req| { - let f = data_api_proxy(req, node_config.clone()); + let f = data_api_proxy(req, node_config.clone(), node.clone()); Cont { f: Box::pin(f) } } })) @@ -42,8 +44,8 @@ pub async fn host(node_config: Arc) -> Result<(), Error> { Ok(()) } -async fn data_api_proxy(req: Request, node_config: Arc) -> Result, Error> { - match data_api_proxy_try(req, node_config).await { +async fn data_api_proxy(req: Request, node_config: NodeConfig, node: Node) -> Result, Error> { + match data_api_proxy_try(req, &node_config, &node).await { Ok(k) => Ok(k), Err(e) => { error!("{:?}", e); @@ -82,24 +84,28 @@ where impl UnwindSafe for Cont {} -async fn data_api_proxy_try(req: Request, node_config: Arc) -> Result, Error> { +async fn data_api_proxy_try( + req: Request, + node_config: &NodeConfig, + node: &Node, +) -> Result, Error> { let uri = req.uri().clone(); let path = uri.path(); if path == "/api/1/parsed_raw" { if req.method() == Method::POST { - Ok(parsed_raw(req, node_config.clone()).await?) + Ok(parsed_raw(req, node).await?) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } } else if path == "/api/1/binned" { if req.method() == Method::GET { - Ok(binned(req, node_config.clone()).await?) + Ok(binned(req, &node_config, node).await?) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } } else if path == "/api/1/prebinned" { if req.method() == Method::GET { - Ok(prebinned(req, node_config.clone()).await?) + Ok(prebinned(req, &node_config, node).await?) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } @@ -119,8 +125,7 @@ where .header("access-control-allow-headers", "*") } -async fn parsed_raw(req: Request, node_config: Arc) -> Result, Error> { - let node = node_config.node.clone(); +async fn parsed_raw(req: Request, node: &Node) -> Result, Error> { use netpod::AggQuerySingleChannel; let reqbody = req.into_body(); let bodyslice = hyper::body::to_bytes(reqbody).await?; @@ -219,11 +224,10 @@ where } } -async fn binned(req: Request, node_config: Arc) -> Result, Error> { - info!("-------------------------------------------------------- BINNED"); +async fn binned(req: Request, node_config: &NodeConfig, node: &Node) -> Result, Error> { let (head, _body) = req.into_parts(); let query = disk::cache::Query::from_request(&head)?; - let ret = match disk::cache::binned_bytes_for_http(node_config, &query).await { + let ret = match disk::cache::binned_bytes_for_http(node_config, node, &query).await { Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("desc-BINNED")))?, Err(e) => { error!("{:?}", e); @@ -233,14 +237,14 @@ async fn binned(req: Request, node_config: Arc) -> Result, node_config: Arc) -> Result, Error> { +async fn prebinned(req: Request, node_config: &NodeConfig, node: &Node) -> Result, Error> { let (head, _body) = req.into_parts(); let q = PreBinnedQuery::from_request(&head)?; let desc = format!("pre-b-{}", q.patch.bin_t_len() / 1000000000); let span1 = span!(Level::INFO, "httpret::prebinned", desc = &desc.as_str()); span1.in_scope(|| { trace!("prebinned"); - let ret = match disk::cache::pre_binned_bytes_for_http(node_config, &q) { + let ret = match disk::cache::pre_binned_bytes_for_http(node_config, node, &q) { Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped( s, format!( diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index a96e083..14e8114 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -6,7 +6,6 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::path::PathBuf; use std::pin::Pin; -use std::sync::Arc; use std::task::{Context, Poll}; use timeunits::*; #[allow(unused_imports)] @@ -94,9 +93,9 @@ impl ScalarType { } } -#[derive(Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Node { - pub id: u32, + pub id: String, pub host: String, pub listen: String, pub port: u16, @@ -112,7 +111,7 @@ impl Node { } } -#[derive(Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Database { pub name: String, pub host: String, @@ -120,16 +119,27 @@ pub struct Database { pub pass: String, } -#[derive(Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Cluster { - pub nodes: Vec>, + pub nodes: Vec, pub database: Database, } -#[derive(Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct NodeConfig { - pub node: Arc, - pub cluster: Arc, + pub nodeid: String, + pub cluster: Cluster, +} + +impl NodeConfig { + pub fn get_node(&self) -> Option<&Node> { + for n in &self.cluster.nodes { + if n.id == self.nodeid { + return Some(n); + } + } + None + } } #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/retrieval/src/bin/retrieval.rs b/retrieval/src/bin/retrieval.rs index 6327ec6..694602c 100644 --- a/retrieval/src/bin/retrieval.rs +++ b/retrieval/src/bin/retrieval.rs @@ -1,6 +1,6 @@ use err::Error; -use netpod::{timeunits::*, Channel, ChannelConfig, Cluster, Database, Node, NodeConfig, ScalarType, Shape}; -use std::sync::Arc; +use netpod::NodeConfig; +use tokio::io::AsyncReadExt; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -20,10 +20,18 @@ async fn go() -> Result<(), Error> { use retrieval::cli::{Opts, SubCmd}; let opts = Opts::parse(); match opts.subcmd { - SubCmd::Retrieval(_subcmd) => { + SubCmd::Retrieval(subcmd) => { trace!("testout"); info!("testout"); error!("testout"); + let mut config_file = tokio::fs::File::open(subcmd.config).await?; + let mut buf = vec![]; + config_file.read_to_end(&mut buf).await?; + let node_config: NodeConfig = serde_json::from_slice(&buf)?; + let node = node_config + .get_node() + .ok_or(Error::with_msg(format!("nodeid config error")))?; + retrieval::run_node(node_config.clone(), node.clone()).await?; } } Ok(()) @@ -31,10 +39,11 @@ async fn go() -> Result<(), Error> { #[test] fn simple_fetch() { + use netpod::{timeunits::*, Channel, ChannelConfig, Cluster, Database, Node, NodeConfig, ScalarType, Shape}; taskrun::run(async { let t1 = chrono::Utc::now(); let node = Node { - id: 0, + id: format!("{:02}", 0), host: "localhost".into(), listen: "0.0.0.0".into(), port: 8360, @@ -43,7 +52,6 @@ fn simple_fetch() { ksprefix: "daq_swissfel".into(), split: 0, }; - let node = Arc::new(node); let query = netpod::AggQuerySingleChannel { channel_config: ChannelConfig { channel: Channel { @@ -71,14 +79,13 @@ fn simple_fetch() { pass: "daqbuffer".into(), }, }; - let cluster = Arc::new(cluster); let node_config = NodeConfig { - node: cluster.nodes[0].clone(), - cluster: cluster, + nodeid: cluster.nodes[0].id.clone(), + cluster, }; - let node_config = Arc::new(node_config); + let node = node_config.get_node().unwrap(); let query_string = serde_json::to_string(&query).unwrap(); - let host = tokio::spawn(httpret::host(node_config)); + let host = tokio::spawn(httpret::host(node_config.clone(), node.clone())); let req = hyper::Request::builder() .method(http::Method::POST) .uri("http://localhost:8360/api/1/parsed_raw") diff --git a/retrieval/src/cli.rs b/retrieval/src/cli.rs index 1a4004e..361fed8 100644 --- a/retrieval/src/cli.rs +++ b/retrieval/src/cli.rs @@ -15,4 +15,7 @@ pub enum SubCmd { } #[derive(Debug, Clap)] -pub struct Retrieval {} +pub struct Retrieval { + #[clap(long)] + pub config: String, +} diff --git a/retrieval/src/lib.rs b/retrieval/src/lib.rs index 9490ebd..46ef91b 100644 --- a/retrieval/src/lib.rs +++ b/retrieval/src/lib.rs @@ -1,6 +1,5 @@ use err::Error; -use netpod::{Cluster, NodeConfig}; -use std::sync::Arc; +use netpod::{Cluster, Node, NodeConfig}; use tokio::task::JoinHandle; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -9,15 +8,20 @@ pub mod cli; #[cfg(test)] pub mod test; -pub fn spawn_test_hosts(cluster: Arc) -> Vec>> { +pub fn spawn_test_hosts(cluster: Cluster) -> Vec>> { let mut ret = vec![]; for node in &cluster.nodes { let node_config = NodeConfig { cluster: cluster.clone(), - node: node.clone(), + nodeid: node.id.clone(), }; - let h = tokio::spawn(httpret::host(Arc::new(node_config))); + let h = tokio::spawn(httpret::host(node_config, node.clone())); ret.push(h); } ret } + +pub async fn run_node(node_config: NodeConfig, node: Node) -> Result<(), Error> { + httpret::host(node_config, node).await?; + Ok(()) +} diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index b5dda16..2be94e4 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -6,25 +6,21 @@ use err::Error; use futures_util::TryStreamExt; use hyper::Body; use netpod::{Cluster, Database, Node}; -use std::sync::Arc; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; fn test_cluster() -> Cluster { let nodes = (0..3) .into_iter() - .map(|id| { - let node = Node { - id, - host: "localhost".into(), - listen: "0.0.0.0".into(), - port: 8360 + id as u16, - port_raw: 8360 + id as u16 + 100, - data_base_path: format!("../tmpdata/node{:02}", id).into(), - ksprefix: "ks".into(), - split: id, - }; - Arc::new(node) + .map(|id| Node { + id: format!("{:02}", id), + host: "localhost".into(), + listen: "0.0.0.0".into(), + port: 8360 + id as u16, + port_raw: 8360 + id as u16 + 100, + data_base_path: format!("../tmpdata/node{:02}", id).into(), + ksprefix: "ks".into(), + split: id, }) .collect(); Cluster { @@ -45,7 +41,7 @@ fn get_cached_0() { async fn get_cached_0_inner() -> Result<(), Error> { let t1 = chrono::Utc::now(); - let cluster = Arc::new(test_cluster()); + let cluster = test_cluster(); let node0 = &cluster.nodes[0]; let hosts = spawn_test_hosts(cluster.clone()); let beg_date: chrono::DateTime = "1970-01-01T00:20:10.000Z".parse()?;