This commit is contained in:
Dominik Werder
2021-04-14 11:38:17 +02:00
parent a6839a487f
commit 06c9963605
11 changed files with 238 additions and 93 deletions

View File

@@ -9,6 +9,7 @@ use futures_util::{pin_mut, StreamExt, future::ready};
use netpod::{Channel, ChannelConfig, ScalarType, Shape, Node, timeunits::*};
use crate::merge::MergeDim1F32Stream;
use netpod::BinSpecDimT;
use std::sync::Arc;
pub trait AggregatorTdim {
type InputValue;
@@ -725,6 +726,7 @@ fn agg_x_dim_0() {
async fn agg_x_dim_0_inner() {
let node = make_test_node(0);
let node = Arc::new(node);
let query = netpod::AggQuerySingleChannel {
channel_config: ChannelConfig {
channel: Channel {
@@ -745,7 +747,7 @@ async fn agg_x_dim_0_inner() {
let bin_count = 20;
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size;
let ts2 = ts1 + HOUR * 24;
let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), &node)
let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node)
.into_dim_1_f32_stream()
//.take(1000)
.map(|q| {
@@ -779,6 +781,7 @@ async fn agg_x_dim_1_inner() {
// /data/sf-databuffer/daq_swissfel/daq_swissfel_3/byTime/S10BC01-DBAM070\:BAM_CH1_NORM/*
// S10BC01-DBAM070:BAM_CH1_NORM
let node = make_test_node(0);
let node = Arc::new(node);
let query = netpod::AggQuerySingleChannel {
channel_config: ChannelConfig {
channel: Channel {
@@ -799,7 +802,7 @@ async fn agg_x_dim_1_inner() {
let bin_count = 10;
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size;
let ts2 = ts1 + HOUR * 24;
let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), &node)
let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node)
.into_dim_1_f32_stream()
//.take(1000)
.map(|q| {
@@ -850,7 +853,8 @@ async fn merge_0_inner() {
make_test_node(k)
})
.map(|node| {
crate::EventBlobsComplete::new(&query, query.channel_config.clone(), &node)
let node = Arc::new(node);
crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node)
.into_dim_1_f32_stream()
})
.collect();

View File

@@ -8,12 +8,14 @@ use futures_core::Stream;
use futures_util::{StreamExt, FutureExt, pin_mut};
use bytes::{Bytes, BytesMut, BufMut};
use chrono::{DateTime, Utc};
use netpod::{Node, Cluster, AggKind, NanoRange, ToNanos, PreBinnedPatchGridSpec, PreBinnedPatchIterator, PreBinnedPatchCoord, Channel};
use netpod::{Node, Cluster, AggKind, NanoRange, ToNanos, PreBinnedPatchGridSpec, PreBinnedPatchIterator, PreBinnedPatchCoord, Channel, NodeConfig};
use crate::agg::MinMaxAvgScalarBinBatch;
use http::uri::Scheme;
use tiny_keccak::Hasher;
use serde::{Serialize, Deserialize};
use std::sync::Arc;
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Query {
range: NanoRange,
count: u64,
@@ -47,12 +49,7 @@ impl Query {
}
pub struct BinParams {
pub node: Node,
pub cluster: Cluster,
}
pub fn binned_bytes_for_http(params: BinParams, query: &Query) -> Result<BinnedBytesForHttpStream, Error> {
pub fn binned_bytes_for_http(node_config: Arc<NodeConfig>, query: &Query) -> Result<BinnedBytesForHttpStream, Error> {
let agg_kind = AggKind::DimXBins1;
// TODO
@@ -62,7 +59,7 @@ pub fn binned_bytes_for_http(params: BinParams, query: &Query) -> Result<BinnedB
Some(spec) => {
info!("GOT PreBinnedPatchGridSpec: {:?}", spec);
warn!("Pass here to BinnedStream what kind of Agg, range, ...");
let s1 = BinnedStream::new(PreBinnedPatchIterator::from_range(spec), query.channel.clone(), agg_kind, params.cluster.clone());
let s1 = BinnedStream::new(PreBinnedPatchIterator::from_range(spec), query.channel.clone(), agg_kind, node_config.cluster.clone());
// Iterate over the patches.
// Request the patch from each node.
// Merge.
@@ -75,6 +72,7 @@ pub fn binned_bytes_for_http(params: BinParams, query: &Query) -> Result<BinnedB
}
None => {
// Merge raw data
error!("TODO merge raw data");
todo!()
}
}
@@ -108,6 +106,78 @@ impl Stream for BinnedBytesForHttpStream {
}
#[derive(Clone, Debug)]
pub struct PreBinnedQuery {
patch: PreBinnedPatchCoord,
agg_kind: AggKind,
channel: Channel,
}
impl PreBinnedQuery {
pub fn from_request(req: &http::request::Parts) -> Result<Self, Error> {
let params = netpod::query_params(req.uri.query());
let ret = PreBinnedQuery {
patch: PreBinnedPatchCoord {
range: NanoRange {
beg: params.get("beg").ok_or(Error::with_msg("missing beg"))?.parse()?,
end: params.get("end").ok_or(Error::with_msg("missing end"))?.parse()?,
},
},
agg_kind: AggKind::DimXBins1,
channel: Channel {
backend: params.get("channel_backend").unwrap().into(),
keyspace: params.get("channel_keyspace").unwrap().parse().unwrap(),
name: params.get("channel_name").unwrap().into(),
},
};
info!("PreBinnedQuery::from_request {:?}", ret);
Ok(ret)
}
}
// NOTE This answers a request for a single valid pre-binned patch.
// A user must first make sure that the grid spec is valid, and that this node is responsible for it.
// Otherwise it is an error.
pub fn pre_binned_bytes_for_http(node_config: Arc<NodeConfig>, query: &PreBinnedQuery) -> Result<PreBinnedValueByteStream, Error> {
info!("pre_binned_bytes_for_http {:?} {:?}", query, node_config.node);
let ret = PreBinnedValueByteStream::new(query.patch.clone(), query.channel.clone(), query.agg_kind.clone(), node_config);
Ok(ret)
}
pub struct PreBinnedValueByteStream {
}
impl PreBinnedValueByteStream {
pub fn new(patch: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, node_config: Arc<NodeConfig>) -> Self {
Self {
}
}
}
impl Stream for PreBinnedValueByteStream {
type Item = Result<Bytes, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
error!("PreBinnedValueByteStream poll_next");
todo!()
}
}
pub struct PreBinnedValueStream {
uri: http::Uri,
patch_coord: PreBinnedPatchCoord,
@@ -117,7 +187,7 @@ pub struct PreBinnedValueStream {
impl PreBinnedValueStream {
pub fn new(patch_coord: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, cluster: Cluster) -> Self {
pub fn new(patch_coord: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, cluster: Arc<Cluster>) -> Self {
let nodeix = node_ix_for_patch(&patch_coord, &channel, &cluster);
let node = &cluster.nodes[nodeix];
let uri: hyper::Uri = format!(
@@ -142,8 +212,16 @@ impl PreBinnedValueStream {
pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, cluster: &Cluster) -> usize {
let mut hash = tiny_keccak::Sha3::v256();
hash.update(channel.backend.as_bytes());
hash.update(channel.name.as_bytes());
0
hash.update(&patch_coord.range.beg.to_le_bytes());
hash.update(&patch_coord.range.end.to_le_bytes());
let mut out = [0; 32];
hash.finalize(&mut out);
let mut a = [out[0], out[1], out[2], out[3]];
let ix = u32::from_le_bytes(a) % cluster.nodes.len() as u32;
info!("node_ix_for_patch {}", ix);
ix as usize
}
@@ -177,7 +255,7 @@ impl Stream for PreBinnedValueStream {
Ready(res) => {
match res {
Ok(res) => {
info!("Got result from subrequest: {:?}", res);
info!("GOT result from SUB REQUEST: {:?}", res);
self.res = Some(res);
continue 'outer;
}
@@ -190,12 +268,12 @@ impl Stream for PreBinnedValueStream {
}
}
None => {
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(&self.uri)
.body(hyper::Body::empty())?;
let client = hyper::Client::new();
info!("START REQUEST FOR {:?}", req);
self.resfut = Some(client.request(req));
continue 'outer;
}
@@ -214,13 +292,17 @@ pub struct BinnedStream {
impl BinnedStream {
pub fn new(patch_it: PreBinnedPatchIterator, channel: Channel, agg_kind: AggKind, cluster: Cluster) -> Self {
pub fn new(patch_it: PreBinnedPatchIterator, channel: Channel, agg_kind: AggKind, cluster: Arc<Cluster>) -> Self {
let mut patch_it = patch_it;
let inp = futures_util::stream::iter(patch_it)
.map(move |coord| {
PreBinnedValueStream::new(coord, channel.clone(), agg_kind.clone(), cluster.clone())
})
.flatten();
.flatten()
.map(|k| {
info!("ITEM {:?}", k);
k
});
Self {
inp: Box::pin(inp),
}
@@ -253,6 +335,7 @@ pub struct SomeReturnThing {}
impl From<SomeReturnThing> for Bytes {
fn from(k: SomeReturnThing) -> Self {
error!("TODO convert result to octets");
todo!("TODO convert result to octets")
}

View File

@@ -13,6 +13,7 @@ use bytes::{Bytes, BytesMut, Buf};
use std::path::PathBuf;
use bitshuffle::bitshuffle_decompress;
use netpod::{ScalarType, Shape, Node, ChannelConfig};
use std::sync::Arc;
pub mod agg;
pub mod gen;
@@ -20,8 +21,8 @@ pub mod merge;
pub mod cache;
pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: &Node) -> Result<netpod::BodyStream, Error> {
let path = datapath(query.timebin as u64, &query.channel_config, node);
pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: Arc<Node>) -> Result<netpod::BodyStream, Error> {
let path = datapath(query.timebin as u64, &query.channel_config, &node);
debug!("try path: {:?}", path);
let fin = OpenOptions::new()
.read(true)
@@ -143,7 +144,7 @@ impl FusedFuture for Fopen1 {
unsafe impl Send for Fopen1 {}
pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel, node: &Node) -> impl Stream<Item=Result<Bytes, Error>> + Send {
pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel, node: Arc<Node>) -> impl Stream<Item=Result<Bytes, Error>> + Send {
let mut query = query.clone();
let node = node.clone();
async_stream::stream! {
@@ -268,11 +269,11 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
}
pub fn raw_concat_channel_read_stream_file_pipe(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> impl Stream<Item=Result<BytesMut, Error>> + Send {
pub fn raw_concat_channel_read_stream_file_pipe(query: &netpod::AggQuerySingleChannel, node: Arc<Node>) -> impl Stream<Item=Result<BytesMut, Error>> + Send {
let query = query.clone();
let node = node.clone();
async_stream::stream! {
let chrx = open_files(&query, &node);
let chrx = open_files(&query, node.clone());
while let Ok(file) = chrx.recv().await {
let mut file = match file {
Ok(k) => k,
@@ -294,7 +295,7 @@ pub fn raw_concat_channel_read_stream_file_pipe(query: &netpod::AggQuerySingleCh
}
}
fn open_files(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> async_channel::Receiver<Result<tokio::fs::File, Error>> {
fn open_files(query: &netpod::AggQuerySingleChannel, node: Arc<Node>) -> async_channel::Receiver<Result<tokio::fs::File, Error>> {
let (chtx, chrx) = async_channel::bounded(2);
let mut query = query.clone();
let node = node.clone();
@@ -346,11 +347,11 @@ pub fn file_content_stream(mut file: tokio::fs::File, buffer_size: usize) -> imp
}
pub fn parsed1(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> impl Stream<Item=Result<Bytes, Error>> + Send {
pub fn parsed1(query: &netpod::AggQuerySingleChannel, node: Arc<Node>) -> impl Stream<Item=Result<Bytes, Error>> + Send {
let query = query.clone();
let node = node.clone();
async_stream::stream! {
let filerx = open_files(&query, &node);
let filerx = open_files(&query, node.clone());
while let Ok(fileres) = filerx.recv().await {
match fileres {
Ok(file) => {
@@ -392,7 +393,8 @@ pub struct EventBlobsComplete {
}
impl EventBlobsComplete {
pub fn new(query: &netpod::AggQuerySingleChannel, channel_config: ChannelConfig, node: &netpod::Node) -> Self {
pub fn new(query: &netpod::AggQuerySingleChannel, channel_config: ChannelConfig, node: Arc<Node>) -> Self {
Self {
file_chan: open_files(query, node),
evs: None,
@@ -400,6 +402,7 @@ impl EventBlobsComplete {
channel_config,
}
}
}
impl Stream for EventBlobsComplete {
@@ -446,11 +449,11 @@ impl Stream for EventBlobsComplete {
}
pub fn event_blobs_complete(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> impl Stream<Item=Result<EventFull, Error>> + Send {
pub fn event_blobs_complete(query: &netpod::AggQuerySingleChannel, node: Arc<Node>) -> impl Stream<Item=Result<EventFull, Error>> + Send {
let query = query.clone();
let node = node.clone();
async_stream::stream! {
let filerx = open_files(&query, &node);
let filerx = open_files(&query, node.clone());
while let Ok(fileres) = filerx.recv().await {
match fileres {
Ok(file) => {
@@ -782,7 +785,7 @@ impl NeedMinBuffer {
impl Stream for NeedMinBuffer {
type Item = Result<BytesMut, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
let mut again = false;
let g = &mut self.inp;
@@ -833,7 +836,7 @@ impl Stream for NeedMinBuffer {
pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> impl Stream<Item=Result<Bytes, Error>> + Send {
pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel, node: Arc<Node>) -> impl Stream<Item=Result<Bytes, Error>> + Send {
let mut query = query.clone();
let node = node.clone();
async_stream::stream! {
@@ -841,7 +844,7 @@ pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel, nod
loop {
let timebin = 18700 + i1;
query.timebin = timebin;
let s2 = raw_concat_channel_read_stream_timebin(&query, &node);
let s2 = raw_concat_channel_read_stream_timebin(&query, node.clone());
pin_mut!(s2);
while let Some(item) = s2.next().await {
yield item;
@@ -855,7 +858,7 @@ pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel, nod
}
pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> impl Stream<Item=Result<Bytes, Error>> {
pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChannel, node: Arc<Node>) -> impl Stream<Item=Result<Bytes, Error>> {
let query = query.clone();
let node = node.clone();
async_stream::stream! {
@@ -885,7 +888,7 @@ pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChan
}
fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &netpod::Node) -> PathBuf {
fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &Node) -> PathBuf {
//let pre = "/data/sf-databuffer/daq_swissfel";
node.data_base_path
.join(format!("{}_{}", node.ksprefix, config.channel.keyspace))