From c5001609ced744103e8bac79348415bc35b48193 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 7 Apr 2021 18:06:23 +0200 Subject: [PATCH] WIP on getting something binned --- Cargo.toml | 4 +- disk/Cargo.toml | 1 + disk/src/agg.rs | 201 +++++++++++++++++++++++++++++++++++++++++++++- disk/src/lib.rs | 120 ++++++++++++++++++++++++++- netpod/src/lib.rs | 54 +++++++++++++ 5 files changed, 373 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 34bc19e..9df41a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ members = ["retrieval", "httpret", "err", "disk"] [profile.release] -#opt-level = 0 +debug = 1 +opt-level = 1 #overflow-checks = true -#debug = 2 #debug-assertions = true diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 74aaa9a..d3300b2 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" [dependencies] tokio = { version = "1.4.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } tracing = "0.1.25" +tracing-subscriber = "0.2.17" serde_json = "1.0" async-channel = "1.6" bytes = "1.0.1" diff --git a/disk/src/agg.rs b/disk/src/agg.rs index e9639a2..4c046f7 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -1,3 +1,13 @@ +#[allow(unused_imports)] +use tracing::{error, warn, info, debug, trace}; +use err::Error; +use std::task::{Context, Poll}; +use std::pin::Pin; +use crate::EventFull; +use futures_core::Stream; +use futures_util::{pin_mut, StreamExt, future::ready}; +use netpod::ScalarType; + pub trait AggregatorTdim { type OutputValue: AggregatableXdim1Bin + AggregatableTdim; } @@ -36,22 +46,42 @@ pub struct ValuesDim1 { values: Vec>, } +impl std::fmt::Debug for ValuesDim1 { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(fmt, "count {} tsA {:?} tsB {:?}", self.tss.len(), self.tss.first(), self.tss.last()) + } +} + impl AggregatableXdim1Bin for ValuesDim1 { type Output = MinMaxAvgScalarEventBatch; + fn into_agg(self) -> Self::Output { - todo!() + let mut ret = MinMaxAvgScalarEventBatch { + tss: Vec::with_capacity(self.tss.len()), + mins: Vec::with_capacity(self.tss.len()), + maxs: Vec::with_capacity(self.tss.len()), + avgs: Vec::with_capacity(self.tss.len()), + }; + // TODO do the actual binning + ret } + } pub struct MinMaxAvgScalarEventBatch { - ts1s: Vec, - ts2s: Vec, + tss: Vec, mins: Vec, maxs: Vec, avgs: Vec, } +impl std::fmt::Debug for MinMaxAvgScalarEventBatch { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(fmt, "MinMaxAvgScalarEventBatch count {}", self.tss.len()) + } +} + impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatch { type Output = MinMaxAvgScalarEventBatch; fn into_agg(self) -> Self::Output { @@ -124,8 +154,139 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarBinSingle { } } + + + +pub struct Dim1F32Stream +where S: Stream> +{ + inp: S, +} + +impl Stream for Dim1F32Stream +where S: Stream> + Unpin +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(k))) => { + let mut ret = ValuesDim1 { + tss: vec![], + values: vec![], + }; + use ScalarType::*; + for i1 in 0..k.tss.len() { + // TODO iterate sibling arrays after single bounds check + let ty = &k.scalar_types[i1]; + let decomp = k.decomps[i1].as_ref().unwrap(); + match ty { + F64 => { + // do the conversion + let n1 = decomp.len(); + assert!(n1 % ty.bytes() as usize == 0); + let ele_count = n1 / ty.bytes() as usize; + let mut j = Vec::with_capacity(ele_count); + // this is safe for ints and floats + unsafe { j.set_len(ele_count); } + let mut p1 = 0; + for i1 in 0..ele_count { + unsafe { + j[i1] = std::mem::transmute_copy::<_, f64>(&decomp[p1]) as f32; + p1 += 8; + } + } + ret.tss.push(k.tss[i1]); + ret.values.push(j); + }, + _ => todo!() + } + } + Ready(Some(Ok(ret))) + } + Ready(Some(Err(e))) => Ready(Some(Err(e))), + Ready(None) => Ready(None), + Pending => Pending, + } + } + +} + +pub trait IntoDim1F32Stream { + fn into_dim_1_f32_stream(self) -> Dim1F32Stream + where Self: Sized, + Self: Stream>; +} + +impl IntoDim1F32Stream for T + where T: Stream> +{ + + fn into_dim_1_f32_stream(self) -> Dim1F32Stream + { + Dim1F32Stream { + inp: self, + } + } + +} + + +pub trait IntoBinnedXBins1 { + type StreamOut; + fn into_binned_x_bins_1(self) -> Self::StreamOut where Self: Stream>; +} + +impl IntoBinnedXBins1 for T where T: Stream> + Unpin { + type StreamOut = IntoBinnedXBins1DefaultStream; + + fn into_binned_x_bins_1(self) -> Self::StreamOut { + IntoBinnedXBins1DefaultStream { + inp: self, + } + } + +} + +pub struct IntoBinnedXBins1DefaultStream where S: Stream> + Unpin, I: AggregatableXdim1Bin { + inp: S, +} + +impl Stream for IntoBinnedXBins1DefaultStream where S: Stream> + Unpin, I: AggregatableXdim1Bin { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(k))) => { + let ret = MinMaxAvgScalarEventBatch { + // TODO fill in the details + tss: vec![], + mins: vec![], + maxs: vec![], + avgs: vec![], + }; + Ready(Some(Ok(ret))) + } + Ready(Some(Err(e))) => Ready(Some(Err(e))), + Ready(None) => Ready(None), + Pending => Pending, + } + } + +} + + + + + #[test] fn agg_x_dim_1() { + crate::run(async { agg_x_dim_1_inner().await; Ok(()) }).unwrap(); +} + +async fn agg_x_dim_1_inner() { let vals = ValuesDim1 { tss: vec![0, 1, 2, 3], values: vec![ @@ -149,5 +310,39 @@ fn agg_x_dim_1() { How do I want to drive the system? If I write the T-binner as a Stream, then I also need to pass it the input! Meaning, I need to pass the Stream which produces the actual numbers from disk. + + readchannel() -> Stream of timestamped byte blobs + .to_f32() -> Stream ? indirection to branch on the underlying shape + .agg_x_bins_1() -> Stream ? can I keep it at the single indirection on the top level? */ + let query = netpod::AggQuerySingleChannel { + ksprefix: "daq_swissfel".into(), + keyspace: 3, + channel: netpod::Channel { + name: "S10BC01-DBAM070:BAM_CH1_NORM".into(), + backend: "sf-databuffer".into(), + }, + timebin: 18721, + tb_file_count: 1, + split: 12, + tbsize: 1000 * 60 * 60 * 24, + buffer_size: 1024 * 4, + }; + let fut1 = crate::EventBlobsComplete::new(&query) + .into_dim_1_f32_stream() + .take(10) + .map(|q| { + if let Ok(ref k) = q { + info!("vals: {:?}", k); + } + q + }) + .into_binned_x_bins_1() + .map(|k| { + let k = k.unwrap(); + info!("after X binning {:?}", k); + k + }) + .for_each(|k| ready(())); + fut1.await; } diff --git a/disk/src/lib.rs b/disk/src/lib.rs index f0a276c..a5f2c7c 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -15,6 +15,7 @@ use bytes::{Bytes, BytesMut, BufMut, Buf}; use std::path::PathBuf; use bitshuffle::bitshuffle_decompress; use async_channel::bounded; +use netpod::ScalarType; pub async fn read_test_1(query: &netpod::AggQuerySingleChannel) -> Result { @@ -380,6 +381,95 @@ pub fn parsed1(query: &netpod::AggQuerySingleChannel) -> impl Stream>, + evs: Option, + buffer_size: u32, +} + +impl EventBlobsComplete { + pub fn new(query: &netpod::AggQuerySingleChannel) -> Self { + Self { + file_chan: open_files(query), + evs: None, + buffer_size: query.buffer_size, + } + } +} + +impl Stream for EventBlobsComplete { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use Poll::*; + 'outer: loop { + let z = match &mut self.evs { + Some(evs) => { + match evs.poll_next_unpin(cx) { + Ready(Some(k)) => { + Ready(Some(k)) + } + Ready(None) => { + self.evs = None; + continue 'outer; + } + Pending => Pending, + } + } + None => { + match self.file_chan.poll_next_unpin(cx) { + Ready(Some(k)) => { + match k { + Ok(file) => { + let inp = Box::pin(file_content_stream(file, self.buffer_size as usize)); + let mut chunker = EventChunker::new(inp); + self.evs.replace(chunker); + continue 'outer; + } + Err(e) => Ready(Some(Err(e))) + } + } + Ready(None) => Ready(None), + Pending => Pending, + } + } + }; + break z; + } + } + +} + + +pub fn event_blobs_complete(query: &netpod::AggQuerySingleChannel) -> impl Stream> + Send { + let query = query.clone(); + async_stream::stream! { + let filerx = open_files(&query); + while let Ok(fileres) = filerx.recv().await { + match fileres { + Ok(file) => { + let inp = Box::pin(file_content_stream(file, query.buffer_size as usize)); + let mut chunker = EventChunker::new(inp); + while let Some(evres) = chunker.next().await { + match evres { + Ok(evres) => { + yield Ok(evres); + } + Err(e) => { + yield Err(e) + } + } + } + } + Err(e) => { + yield Err(e); + } + } + } + } +} + + pub struct EventChunker { inp: NeedMinBuffer, had_channel: bool, @@ -527,7 +617,7 @@ impl EventChunker { let c1 = bitshuffle_decompress(&buf.as_ref()[p1 as usize..], &mut decomp, ele_count as usize, ele_size as usize, 0); //info!("decompress result: {:?}", c1); assert!(c1.unwrap() as u32 == k1); - ret.add_event(ts, pulse, Some(decomp)); + ret.add_event(ts, pulse, Some(decomp), ScalarType::from_dtype_index(type_index)); } buf.advance(len as usize); need_min = 4; @@ -614,6 +704,7 @@ pub struct EventFull { tss: Vec, pulses: Vec, decomps: Vec>, + scalar_types: Vec, } impl EventFull { @@ -623,13 +714,15 @@ impl EventFull { tss: vec![], pulses: vec![], decomps: vec![], + scalar_types: vec![], } } - fn add_event(&mut self, ts: u64, pulse: u64, decomp: Option) { + fn add_event(&mut self, ts: u64, pulse: u64, decomp: Option, scalar_type: ScalarType) { self.tss.push(ts); self.pulses.push(pulse); self.decomps.push(decomp); + self.scalar_types.push(scalar_type); } } @@ -836,3 +929,26 @@ impl futures_core::Stream for RawConcatChannelReader { } } + +fn run>>(f: F) -> Result { + tracing_init(); + tokio::runtime::Builder::new_multi_thread() + .worker_threads(12) + .max_blocking_threads(256) + .enable_all() + .build() + .unwrap() + .block_on(async { + f.await + }) +} + +pub fn tracing_init() { + tracing_subscriber::fmt() + //.with_timer(tracing_subscriber::fmt::time::uptime()) + .with_target(true) + .with_thread_names(true) + //.with_max_level(tracing::Level::INFO) + .with_env_filter(tracing_subscriber::EnvFilter::new("info,retrieval=trace,disk=trace,tokio_postgres=info")) + .init(); +} diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 6692d4a..84e9801 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -38,3 +38,57 @@ pub struct BodyStream { //pub receiver: async_channel::Receiver>, pub inner: Box> + Send + Unpin>, } + +pub enum ScalarType { + U8, + U16, + U32, + U64, + I8, + I16, + I32, + I64, + F32, + F64, +} + +impl ScalarType { + + pub fn from_dtype_index(ix: u8) -> Self { + use ScalarType::*; + match ix { + 0 => panic!("BOOL not supported"), + 1 => panic!("BOOL8 not supported"), + 3 => U8, + 5 => U16, + 8 => U32, + 10 => U64, + 2 => I8, + 4 => I16, + 7 => I32, + 9 => I64, + 11 => F32, + 12 => F64, + 6 => panic!("CHARACTER not supported"), + 13 => panic!("STRING not supported"), + _ => panic!("unknown"), + } + } + + pub fn bytes(&self) -> u8 { + use ScalarType::*; + match self { + U8 => 1, + U16 => 2, + U32 => 4, + U64 => 8, + I8 => 1, + I16 => 2, + I32 => 4, + I64 => 8, + F32 => 4, + F64 => 8, + } + } + +}