WIP on getting something binned
This commit is contained in:
@@ -2,7 +2,7 @@
|
||||
members = ["retrieval", "httpret", "err", "disk"]
|
||||
|
||||
[profile.release]
|
||||
#opt-level = 0
|
||||
debug = 1
|
||||
opt-level = 1
|
||||
#overflow-checks = true
|
||||
#debug = 2
|
||||
#debug-assertions = true
|
||||
|
||||
@@ -7,6 +7,7 @@ edition = "2018"
|
||||
[dependencies]
|
||||
tokio = { version = "1.4.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
|
||||
tracing = "0.1.25"
|
||||
tracing-subscriber = "0.2.17"
|
||||
serde_json = "1.0"
|
||||
async-channel = "1.6"
|
||||
bytes = "1.0.1"
|
||||
|
||||
201
disk/src/agg.rs
201
disk/src/agg.rs
@@ -1,3 +1,13 @@
|
||||
#[allow(unused_imports)]
|
||||
use tracing::{error, warn, info, debug, trace};
|
||||
use err::Error;
|
||||
use std::task::{Context, Poll};
|
||||
use std::pin::Pin;
|
||||
use crate::EventFull;
|
||||
use futures_core::Stream;
|
||||
use futures_util::{pin_mut, StreamExt, future::ready};
|
||||
use netpod::ScalarType;
|
||||
|
||||
pub trait AggregatorTdim {
|
||||
type OutputValue: AggregatableXdim1Bin + AggregatableTdim;
|
||||
}
|
||||
@@ -36,22 +46,42 @@ pub struct ValuesDim1 {
|
||||
values: Vec<Vec<f32>>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ValuesDim1 {
|
||||
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(fmt, "count {} tsA {:?} tsB {:?}", self.tss.len(), self.tss.first(), self.tss.last())
|
||||
}
|
||||
}
|
||||
|
||||
impl AggregatableXdim1Bin for ValuesDim1 {
|
||||
type Output = MinMaxAvgScalarEventBatch;
|
||||
|
||||
fn into_agg(self) -> Self::Output {
|
||||
todo!()
|
||||
let mut ret = MinMaxAvgScalarEventBatch {
|
||||
tss: Vec::with_capacity(self.tss.len()),
|
||||
mins: Vec::with_capacity(self.tss.len()),
|
||||
maxs: Vec::with_capacity(self.tss.len()),
|
||||
avgs: Vec::with_capacity(self.tss.len()),
|
||||
};
|
||||
// TODO do the actual binning
|
||||
ret
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
pub struct MinMaxAvgScalarEventBatch {
|
||||
ts1s: Vec<u64>,
|
||||
ts2s: Vec<u64>,
|
||||
tss: Vec<u64>,
|
||||
mins: Vec<f32>,
|
||||
maxs: Vec<f32>,
|
||||
avgs: Vec<f32>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for MinMaxAvgScalarEventBatch {
|
||||
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(fmt, "MinMaxAvgScalarEventBatch count {}", self.tss.len())
|
||||
}
|
||||
}
|
||||
|
||||
impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatch {
|
||||
type Output = MinMaxAvgScalarEventBatch;
|
||||
fn into_agg(self) -> Self::Output {
|
||||
@@ -124,8 +154,139 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarBinSingle {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
pub struct Dim1F32Stream<S>
|
||||
where S: Stream<Item=Result<EventFull, Error>>
|
||||
{
|
||||
inp: S,
|
||||
}
|
||||
|
||||
impl<S> Stream for Dim1F32Stream<S>
|
||||
where S: Stream<Item=Result<EventFull, Error>> + Unpin
|
||||
{
|
||||
type Item = Result<ValuesDim1, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(k))) => {
|
||||
let mut ret = ValuesDim1 {
|
||||
tss: vec![],
|
||||
values: vec![],
|
||||
};
|
||||
use ScalarType::*;
|
||||
for i1 in 0..k.tss.len() {
|
||||
// TODO iterate sibling arrays after single bounds check
|
||||
let ty = &k.scalar_types[i1];
|
||||
let decomp = k.decomps[i1].as_ref().unwrap();
|
||||
match ty {
|
||||
F64 => {
|
||||
// do the conversion
|
||||
let n1 = decomp.len();
|
||||
assert!(n1 % ty.bytes() as usize == 0);
|
||||
let ele_count = n1 / ty.bytes() as usize;
|
||||
let mut j = Vec::with_capacity(ele_count);
|
||||
// this is safe for ints and floats
|
||||
unsafe { j.set_len(ele_count); }
|
||||
let mut p1 = 0;
|
||||
for i1 in 0..ele_count {
|
||||
unsafe {
|
||||
j[i1] = std::mem::transmute_copy::<_, f64>(&decomp[p1]) as f32;
|
||||
p1 += 8;
|
||||
}
|
||||
}
|
||||
ret.tss.push(k.tss[i1]);
|
||||
ret.values.push(j);
|
||||
},
|
||||
_ => todo!()
|
||||
}
|
||||
}
|
||||
Ready(Some(Ok(ret)))
|
||||
}
|
||||
Ready(Some(Err(e))) => Ready(Some(Err(e))),
|
||||
Ready(None) => Ready(None),
|
||||
Pending => Pending,
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
pub trait IntoDim1F32Stream {
|
||||
fn into_dim_1_f32_stream(self) -> Dim1F32Stream<Self>
|
||||
where Self: Sized,
|
||||
Self: Stream<Item=Result<EventFull, Error>>;
|
||||
}
|
||||
|
||||
impl<T> IntoDim1F32Stream for T
|
||||
where T: Stream<Item=Result<EventFull, Error>>
|
||||
{
|
||||
|
||||
fn into_dim_1_f32_stream(self) -> Dim1F32Stream<T>
|
||||
{
|
||||
Dim1F32Stream {
|
||||
inp: self,
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
pub trait IntoBinnedXBins1<I: AggregatableXdim1Bin> {
|
||||
type StreamOut;
|
||||
fn into_binned_x_bins_1(self) -> Self::StreamOut where Self: Stream<Item=Result<I, Error>>;
|
||||
}
|
||||
|
||||
impl<T, I: AggregatableXdim1Bin> IntoBinnedXBins1<I> for T where T: Stream<Item=Result<I, Error>> + Unpin {
|
||||
type StreamOut = IntoBinnedXBins1DefaultStream<T, I>;
|
||||
|
||||
fn into_binned_x_bins_1(self) -> Self::StreamOut {
|
||||
IntoBinnedXBins1DefaultStream {
|
||||
inp: self,
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
pub struct IntoBinnedXBins1DefaultStream<S, I> where S: Stream<Item=Result<I, Error>> + Unpin, I: AggregatableXdim1Bin {
|
||||
inp: S,
|
||||
}
|
||||
|
||||
impl<S, I> Stream for IntoBinnedXBins1DefaultStream<S, I> where S: Stream<Item=Result<I, Error>> + Unpin, I: AggregatableXdim1Bin {
|
||||
type Item = Result<MinMaxAvgScalarEventBatch, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(k))) => {
|
||||
let ret = MinMaxAvgScalarEventBatch {
|
||||
// TODO fill in the details
|
||||
tss: vec![],
|
||||
mins: vec![],
|
||||
maxs: vec![],
|
||||
avgs: vec![],
|
||||
};
|
||||
Ready(Some(Ok(ret)))
|
||||
}
|
||||
Ready(Some(Err(e))) => Ready(Some(Err(e))),
|
||||
Ready(None) => Ready(None),
|
||||
Pending => Pending,
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
#[test]
|
||||
fn agg_x_dim_1() {
|
||||
crate::run(async { agg_x_dim_1_inner().await; Ok(()) }).unwrap();
|
||||
}
|
||||
|
||||
async fn agg_x_dim_1_inner() {
|
||||
let vals = ValuesDim1 {
|
||||
tss: vec![0, 1, 2, 3],
|
||||
values: vec![
|
||||
@@ -149,5 +310,39 @@ fn agg_x_dim_1() {
|
||||
How do I want to drive the system?
|
||||
If I write the T-binner as a Stream, then I also need to pass it the input!
|
||||
Meaning, I need to pass the Stream which produces the actual numbers from disk.
|
||||
|
||||
readchannel() -> Stream of timestamped byte blobs
|
||||
.to_f32() -> Stream ? indirection to branch on the underlying shape
|
||||
.agg_x_bins_1() -> Stream ? can I keep it at the single indirection on the top level?
|
||||
*/
|
||||
let query = netpod::AggQuerySingleChannel {
|
||||
ksprefix: "daq_swissfel".into(),
|
||||
keyspace: 3,
|
||||
channel: netpod::Channel {
|
||||
name: "S10BC01-DBAM070:BAM_CH1_NORM".into(),
|
||||
backend: "sf-databuffer".into(),
|
||||
},
|
||||
timebin: 18721,
|
||||
tb_file_count: 1,
|
||||
split: 12,
|
||||
tbsize: 1000 * 60 * 60 * 24,
|
||||
buffer_size: 1024 * 4,
|
||||
};
|
||||
let fut1 = crate::EventBlobsComplete::new(&query)
|
||||
.into_dim_1_f32_stream()
|
||||
.take(10)
|
||||
.map(|q| {
|
||||
if let Ok(ref k) = q {
|
||||
info!("vals: {:?}", k);
|
||||
}
|
||||
q
|
||||
})
|
||||
.into_binned_x_bins_1()
|
||||
.map(|k| {
|
||||
let k = k.unwrap();
|
||||
info!("after X binning {:?}", k);
|
||||
k
|
||||
})
|
||||
.for_each(|k| ready(()));
|
||||
fut1.await;
|
||||
}
|
||||
|
||||
120
disk/src/lib.rs
120
disk/src/lib.rs
@@ -15,6 +15,7 @@ use bytes::{Bytes, BytesMut, BufMut, Buf};
|
||||
use std::path::PathBuf;
|
||||
use bitshuffle::bitshuffle_decompress;
|
||||
use async_channel::bounded;
|
||||
use netpod::ScalarType;
|
||||
|
||||
|
||||
pub async fn read_test_1(query: &netpod::AggQuerySingleChannel) -> Result<netpod::BodyStream, Error> {
|
||||
@@ -380,6 +381,95 @@ pub fn parsed1(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result
|
||||
}
|
||||
|
||||
|
||||
pub struct EventBlobsComplete {
|
||||
file_chan: async_channel::Receiver<Result<File, Error>>,
|
||||
evs: Option<EventChunker>,
|
||||
buffer_size: u32,
|
||||
}
|
||||
|
||||
impl EventBlobsComplete {
|
||||
pub fn new(query: &netpod::AggQuerySingleChannel) -> Self {
|
||||
Self {
|
||||
file_chan: open_files(query),
|
||||
evs: None,
|
||||
buffer_size: query.buffer_size,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for EventBlobsComplete {
|
||||
type Item = Result<EventFull, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
'outer: loop {
|
||||
let z = match &mut self.evs {
|
||||
Some(evs) => {
|
||||
match evs.poll_next_unpin(cx) {
|
||||
Ready(Some(k)) => {
|
||||
Ready(Some(k))
|
||||
}
|
||||
Ready(None) => {
|
||||
self.evs = None;
|
||||
continue 'outer;
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
}
|
||||
None => {
|
||||
match self.file_chan.poll_next_unpin(cx) {
|
||||
Ready(Some(k)) => {
|
||||
match k {
|
||||
Ok(file) => {
|
||||
let inp = Box::pin(file_content_stream(file, self.buffer_size as usize));
|
||||
let mut chunker = EventChunker::new(inp);
|
||||
self.evs.replace(chunker);
|
||||
continue 'outer;
|
||||
}
|
||||
Err(e) => Ready(Some(Err(e)))
|
||||
}
|
||||
}
|
||||
Ready(None) => Ready(None),
|
||||
Pending => Pending,
|
||||
}
|
||||
}
|
||||
};
|
||||
break z;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
pub fn event_blobs_complete(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<EventFull, Error>> + Send {
|
||||
let query = query.clone();
|
||||
async_stream::stream! {
|
||||
let filerx = open_files(&query);
|
||||
while let Ok(fileres) = filerx.recv().await {
|
||||
match fileres {
|
||||
Ok(file) => {
|
||||
let inp = Box::pin(file_content_stream(file, query.buffer_size as usize));
|
||||
let mut chunker = EventChunker::new(inp);
|
||||
while let Some(evres) = chunker.next().await {
|
||||
match evres {
|
||||
Ok(evres) => {
|
||||
yield Ok(evres);
|
||||
}
|
||||
Err(e) => {
|
||||
yield Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
yield Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub struct EventChunker {
|
||||
inp: NeedMinBuffer,
|
||||
had_channel: bool,
|
||||
@@ -527,7 +617,7 @@ impl EventChunker {
|
||||
let c1 = bitshuffle_decompress(&buf.as_ref()[p1 as usize..], &mut decomp, ele_count as usize, ele_size as usize, 0);
|
||||
//info!("decompress result: {:?}", c1);
|
||||
assert!(c1.unwrap() as u32 == k1);
|
||||
ret.add_event(ts, pulse, Some(decomp));
|
||||
ret.add_event(ts, pulse, Some(decomp), ScalarType::from_dtype_index(type_index));
|
||||
}
|
||||
buf.advance(len as usize);
|
||||
need_min = 4;
|
||||
@@ -614,6 +704,7 @@ pub struct EventFull {
|
||||
tss: Vec<u64>,
|
||||
pulses: Vec<u64>,
|
||||
decomps: Vec<Option<BytesMut>>,
|
||||
scalar_types: Vec<ScalarType>,
|
||||
}
|
||||
|
||||
impl EventFull {
|
||||
@@ -623,13 +714,15 @@ impl EventFull {
|
||||
tss: vec![],
|
||||
pulses: vec![],
|
||||
decomps: vec![],
|
||||
scalar_types: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
fn add_event(&mut self, ts: u64, pulse: u64, decomp: Option<BytesMut>) {
|
||||
fn add_event(&mut self, ts: u64, pulse: u64, decomp: Option<BytesMut>, scalar_type: ScalarType) {
|
||||
self.tss.push(ts);
|
||||
self.pulses.push(pulse);
|
||||
self.decomps.push(decomp);
|
||||
self.scalar_types.push(scalar_type);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -836,3 +929,26 @@ impl futures_core::Stream for RawConcatChannelReader {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
fn run<T, F: std::future::Future<Output=Result<T, Error>>>(f: F) -> Result<T, Error> {
|
||||
tracing_init();
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(12)
|
||||
.max_blocking_threads(256)
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
.block_on(async {
|
||||
f.await
|
||||
})
|
||||
}
|
||||
|
||||
pub fn tracing_init() {
|
||||
tracing_subscriber::fmt()
|
||||
//.with_timer(tracing_subscriber::fmt::time::uptime())
|
||||
.with_target(true)
|
||||
.with_thread_names(true)
|
||||
//.with_max_level(tracing::Level::INFO)
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::new("info,retrieval=trace,disk=trace,tokio_postgres=info"))
|
||||
.init();
|
||||
}
|
||||
|
||||
@@ -38,3 +38,57 @@ pub struct BodyStream {
|
||||
//pub receiver: async_channel::Receiver<Result<bytes::Bytes, Error>>,
|
||||
pub inner: Box<dyn futures_core::Stream<Item=Result<bytes::Bytes, Error>> + Send + Unpin>,
|
||||
}
|
||||
|
||||
pub enum ScalarType {
|
||||
U8,
|
||||
U16,
|
||||
U32,
|
||||
U64,
|
||||
I8,
|
||||
I16,
|
||||
I32,
|
||||
I64,
|
||||
F32,
|
||||
F64,
|
||||
}
|
||||
|
||||
impl ScalarType {
|
||||
|
||||
pub fn from_dtype_index(ix: u8) -> Self {
|
||||
use ScalarType::*;
|
||||
match ix {
|
||||
0 => panic!("BOOL not supported"),
|
||||
1 => panic!("BOOL8 not supported"),
|
||||
3 => U8,
|
||||
5 => U16,
|
||||
8 => U32,
|
||||
10 => U64,
|
||||
2 => I8,
|
||||
4 => I16,
|
||||
7 => I32,
|
||||
9 => I64,
|
||||
11 => F32,
|
||||
12 => F64,
|
||||
6 => panic!("CHARACTER not supported"),
|
||||
13 => panic!("STRING not supported"),
|
||||
_ => panic!("unknown"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn bytes(&self) -> u8 {
|
||||
use ScalarType::*;
|
||||
match self {
|
||||
U8 => 1,
|
||||
U16 => 2,
|
||||
U32 => 4,
|
||||
U64 => 8,
|
||||
I8 => 1,
|
||||
I16 => 2,
|
||||
I32 => 4,
|
||||
I64 => 8,
|
||||
F32 => 4,
|
||||
F64 => 8,
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user