Better diagnostics and more basic flow structure

This commit is contained in:
Dominik Werder
2021-04-14 17:49:44 +02:00
parent 06c9963605
commit 91f3ec790c
11 changed files with 203 additions and 71 deletions

View File

@@ -708,12 +708,13 @@ where I: AggregatableTdim + Unpin, T: Stream<Item=Result<I, Error>> + Unpin, I::
}
}
pub fn make_test_node(ix: u8) -> Node {
pub fn make_test_node(id: u32) -> Node {
Node {
id,
host: "localhost".into(),
port: 8800 + ix as u16,
data_base_path: format!("../tmpdata/node{:02}", ix).into(),
split: ix,
port: 8800 + id as u16,
data_base_path: format!("../tmpdata/node{:02}", id).into(),
split: id,
ksprefix: "ks".into(),
}
}
@@ -731,9 +732,9 @@ async fn agg_x_dim_0_inner() {
channel_config: ChannelConfig {
channel: Channel {
backend: "sf-databuffer".into(),
keyspace: 2,
name: "S10BC01-DBAM070:EOM1_T1".into(),
},
keyspace: 2,
time_bin_size: DAY,
shape: Shape::Scalar,
scalar_type: ScalarType::F64,
@@ -786,9 +787,9 @@ async fn agg_x_dim_1_inner() {
channel_config: ChannelConfig {
channel: Channel {
backend: "ks".into(),
keyspace: 3,
name: "wave1".into(),
},
keyspace: 3,
time_bin_size: DAY,
shape: Shape::Wave(1024),
scalar_type: ScalarType::F64,
@@ -835,9 +836,9 @@ async fn merge_0_inner() {
channel_config: ChannelConfig {
channel: Channel {
backend: "ks".into(),
keyspace: 3,
name: "wave1".into(),
},
keyspace: 3,
time_bin_size: DAY,
shape: Shape::Wave(17),
scalar_type: ScalarType::F64,

View File

@@ -38,7 +38,6 @@ impl Query {
agg_kind: AggKind::DimXBins1,
channel: Channel {
backend: params.get("channel_backend").unwrap().into(),
keyspace: params.get("channel_keyspace").unwrap().parse().unwrap(),
name: params.get("channel_name").unwrap().into(),
},
};
@@ -59,7 +58,7 @@ pub fn binned_bytes_for_http(node_config: Arc<NodeConfig>, query: &Query) -> Res
Some(spec) => {
info!("GOT PreBinnedPatchGridSpec: {:?}", spec);
warn!("Pass here to BinnedStream what kind of Agg, range, ...");
let s1 = BinnedStream::new(PreBinnedPatchIterator::from_range(spec), query.channel.clone(), agg_kind, node_config.cluster.clone());
let s1 = BinnedStream::new(PreBinnedPatchIterator::from_range(spec), query.channel.clone(), agg_kind, node_config.clone());
// Iterate over the patches.
// Request the patch from each node.
// Merge.
@@ -130,7 +129,6 @@ impl PreBinnedQuery {
agg_kind: AggKind::DimXBins1,
channel: Channel {
backend: params.get("channel_backend").unwrap().into(),
keyspace: params.get("channel_keyspace").unwrap().parse().unwrap(),
name: params.get("channel_name").unwrap().into(),
},
};
@@ -153,12 +151,15 @@ pub fn pre_binned_bytes_for_http(node_config: Arc<NodeConfig>, query: &PreBinned
pub struct PreBinnedValueByteStream {
inp: PreBinnedValueStream,
}
impl PreBinnedValueByteStream {
pub fn new(patch: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, node_config: Arc<NodeConfig>) -> Self {
warn!("PreBinnedValueByteStream");
Self {
inp: PreBinnedValueStream::new(patch, channel, agg_kind, node_config),
}
}
@@ -167,9 +168,19 @@ impl PreBinnedValueByteStream {
impl Stream for PreBinnedValueByteStream {
type Item = Result<Bytes, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
error!("PreBinnedValueByteStream poll_next");
todo!()
use Poll::*;
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
error!("TODO convert item to Bytes");
let buf = Bytes::new();
Ready(Some(Ok(buf)))
}
Ready(Some(Err(e))) => Ready(Some(Err(e))),
Ready(None) => Ready(None),
Pending => Pending,
}
}
}
@@ -179,23 +190,66 @@ impl Stream for PreBinnedValueByteStream {
pub struct PreBinnedValueStream {
patch_coord: PreBinnedPatchCoord,
channel: Channel,
agg_kind: AggKind,
node_config: Arc<NodeConfig>,
}
impl PreBinnedValueStream {
pub fn new(patch_coord: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, node_config: Arc<NodeConfig>) -> Self {
let node_ix = node_ix_for_patch(&patch_coord, &channel, &node_config.cluster);
assert!(node_ix == node_config.node.id);
Self {
patch_coord,
channel,
agg_kind,
node_config,
}
}
}
impl Stream for PreBinnedValueStream {
// TODO need this generic for scalar and array (when wave is not binned down to a single scalar point)
type Item = Result<MinMaxAvgScalarBinBatch, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
// TODO provide the data from a local cached file, the on-the-fly binning of higher-res
// pre-binned patches which we can get again via http, or if there is no higher resolution
// then from raw events, or a combination of all those especially if there is not yet
// a pre-binned patch, and we have to stitch from higher-res-pre-bin plus extend with
// on-the-fly binning of fresh data.
error!("TODO provide the pre-binned data");
Ready(None)
}
}
pub struct PreBinnedValueFetchedStream {
uri: http::Uri,
patch_coord: PreBinnedPatchCoord,
resfut: Option<hyper::client::ResponseFuture>,
res: Option<hyper::Response<hyper::Body>>,
}
impl PreBinnedValueStream {
impl PreBinnedValueFetchedStream {
pub fn new(patch_coord: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, cluster: Arc<Cluster>) -> Self {
let nodeix = node_ix_for_patch(&patch_coord, &channel, &cluster);
let node = &cluster.nodes[nodeix];
pub fn new(patch_coord: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, node_config: Arc<NodeConfig>) -> Self {
let nodeix = node_ix_for_patch(&patch_coord, &channel, &node_config.cluster);
let node = &node_config.cluster.nodes[nodeix as usize];
let uri: hyper::Uri = format!(
"http://{}:{}/api/1/prebinned?beg={}&end={}&channel={}&agg_kind={:?}",
"http://{}:{}/api/1/prebinned?beg={}&end={}&channel_backend={}&channel_name={}&agg_kind={:?}",
node.host,
node.port,
patch_coord.range.beg,
patch_coord.range.end,
channel.backend,
channel.name,
agg_kind,
).parse().unwrap();
@@ -209,23 +263,7 @@ impl PreBinnedValueStream {
}
pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, cluster: &Cluster) -> usize {
let mut hash = tiny_keccak::Sha3::v256();
hash.update(channel.backend.as_bytes());
hash.update(channel.name.as_bytes());
hash.update(&patch_coord.range.beg.to_le_bytes());
hash.update(&patch_coord.range.end.to_le_bytes());
let mut out = [0; 32];
hash.finalize(&mut out);
let mut a = [out[0], out[1], out[2], out[3]];
let ix = u32::from_le_bytes(a) % cluster.nodes.len() as u32;
info!("node_ix_for_patch {}", ix);
ix as usize
}
impl Stream for PreBinnedValueStream {
impl Stream for PreBinnedValueFetchedStream {
// TODO need this generic for scalar and array (when wave is not binned down to a single scalar point)
type Item = Result<MinMaxAvgScalarBinBatch, Error>;
@@ -259,7 +297,10 @@ impl Stream for PreBinnedValueStream {
self.res = Some(res);
continue 'outer;
}
Err(e) => Ready(Some(Err(e.into()))),
Err(e) => {
error!("PreBinnedValueStream error in stream {:?}", e);
Ready(Some(Err(e.into())))
}
}
}
Pending => {
@@ -286,17 +327,19 @@ impl Stream for PreBinnedValueStream {
}
pub struct BinnedStream {
inp: Pin<Box<dyn Stream<Item=Result<MinMaxAvgScalarBinBatch, Error>> + Send>>,
}
impl BinnedStream {
pub fn new(patch_it: PreBinnedPatchIterator, channel: Channel, agg_kind: AggKind, cluster: Arc<Cluster>) -> Self {
pub fn new(patch_it: PreBinnedPatchIterator, channel: Channel, agg_kind: AggKind, node_config: Arc<NodeConfig>) -> Self {
warn!("BinnedStream will open a PreBinnedValueStream");
let mut patch_it = patch_it;
let inp = futures_util::stream::iter(patch_it)
.map(move |coord| {
PreBinnedValueStream::new(coord, channel.clone(), agg_kind.clone(), cluster.clone())
PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), node_config.clone())
})
.flatten()
.map(|k| {
@@ -340,3 +383,19 @@ impl From<SomeReturnThing> for Bytes {
}
}
pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, cluster: &Cluster) -> u32 {
let mut hash = tiny_keccak::Sha3::v256();
hash.update(channel.backend.as_bytes());
hash.update(channel.name.as_bytes());
hash.update(&patch_coord.range.beg.to_le_bytes());
hash.update(&patch_coord.range.end.to_le_bytes());
let mut out = [0; 32];
hash.finalize(&mut out);
let mut a = [out[0], out[1], out[2], out[3]];
let ix = u32::from_le_bytes(a) % cluster.nodes.len() as u32;
info!("node_ix_for_patch {}", ix);
ix
}

View File

@@ -36,9 +36,9 @@ pub async fn gen_test_data() -> Result<(), Error> {
config: ChannelConfig {
channel: Channel {
backend: "test".into(),
keyspace: 3,
name: "wave1".into(),
},
keyspace: 3,
time_bin_size: DAY,
scalar_type: ScalarType::F64,
shape: Shape::Wave(17),
@@ -51,9 +51,10 @@ pub async fn gen_test_data() -> Result<(), Error> {
}
for i1 in 0..13 {
let node = Node {
id: i1,
host: "localhost".into(),
port: 7780 + i1,
split: i1 as u8,
port: 7780 + i1 as u16,
split: i1,
data_base_path: data_base_path.join(format!("node{:02}", i1)),
ksprefix: ksprefix.clone(),
};
@@ -88,7 +89,7 @@ async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) ->
.join(&chn.config.channel.name);
tokio::fs::create_dir_all(&config_path).await?;
let channel_path = node.data_base_path
.join(format!("{}_{}", node.ksprefix, chn.config.channel.keyspace))
.join(format!("{}_{}", node.ksprefix, chn.config.keyspace))
.join("byTime")
.join(&chn.config.channel.name);
tokio::fs::create_dir_all(&channel_path).await?;

View File

@@ -891,7 +891,7 @@ pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChan
fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &Node) -> PathBuf {
//let pre = "/data/sf-databuffer/daq_swissfel";
node.data_base_path
.join(format!("{}_{}", node.ksprefix, config.channel.keyspace))
.join(format!("{}_{}", node.ksprefix, config.keyspace))
.join("byTime")
.join(config.channel.name.clone())
.join(format!("{:019}", timebin))