Remove old into-x-binning
This commit is contained in:
@@ -1,4 +1,3 @@
|
||||
use crate::agg::binnedx::IntoBinnedXBins1;
|
||||
use crate::agg::enp::{Identity, WaveXBinner};
|
||||
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
|
||||
use crate::agg::streams::StreamItem;
|
||||
@@ -283,112 +282,39 @@ async fn events_conn_handler_inner_try(
|
||||
compression: entry.is_compressed,
|
||||
};
|
||||
|
||||
if true {
|
||||
// TODO use a requested buffer size
|
||||
let buffer_size = 1024 * 4;
|
||||
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
|
||||
let event_blobs = EventBlobsComplete::new(
|
||||
range.clone(),
|
||||
channel_config.clone(),
|
||||
node_config.node.clone(),
|
||||
node_config.ix,
|
||||
buffer_size,
|
||||
event_chunker_conf,
|
||||
);
|
||||
let shape = entry.to_shape().unwrap();
|
||||
let mut p1 = pipe1!(entry.scalar_type, entry.byte_order, shape, evq.agg_kind, event_blobs);
|
||||
while let Some(item) = p1.next().await {
|
||||
let item = item.make_frame();
|
||||
match item {
|
||||
Ok(buf) => match netout.write_all(&buf).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => return Err((e, netout))?,
|
||||
},
|
||||
Err(e) => {
|
||||
return Err((e, netout))?;
|
||||
}
|
||||
}
|
||||
}
|
||||
let buf = make_term_frame();
|
||||
match netout.write_all(&buf).await {
|
||||
Ok(_) => (),
|
||||
Err(e) => return Err((e, netout))?,
|
||||
}
|
||||
match netout.flush().await {
|
||||
Ok(_) => (),
|
||||
Err(e) => return Err((e, netout))?,
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
// TODO remove this scope after refactor.
|
||||
let buffer_size = 1024 * 4;
|
||||
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
|
||||
let s1 = EventBlobsComplete::new(
|
||||
range.clone(),
|
||||
channel_config.clone(),
|
||||
node_config.node.clone(),
|
||||
node_config.ix,
|
||||
buffer_size,
|
||||
event_chunker_conf,
|
||||
)
|
||||
.into_dim_1_f32_stream();
|
||||
// TODO need to decide already here on the type I want to use.
|
||||
let mut s1 = IntoBinnedXBins1::<_, BinnedStreamKindScalar>::into_binned_x_bins_1(s1);
|
||||
let mut e = 0;
|
||||
while let Some(item) = s1.next().await {
|
||||
match &item {
|
||||
Ok(StreamItem::DataItem(_)) => {
|
||||
e += 1;
|
||||
}
|
||||
// TODO use a requested buffer size
|
||||
let buffer_size = 1024 * 4;
|
||||
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
|
||||
let event_blobs = EventBlobsComplete::new(
|
||||
range.clone(),
|
||||
channel_config.clone(),
|
||||
node_config.node.clone(),
|
||||
node_config.ix,
|
||||
buffer_size,
|
||||
event_chunker_conf,
|
||||
);
|
||||
let shape = entry.to_shape().unwrap();
|
||||
let mut p1 = pipe1!(entry.scalar_type, entry.byte_order, shape, evq.agg_kind, event_blobs);
|
||||
while let Some(item) = p1.next().await {
|
||||
let item = item.make_frame();
|
||||
match item {
|
||||
Ok(buf) => match netout.write_all(&buf).await {
|
||||
Ok(_) => {}
|
||||
Err(_) => {}
|
||||
}
|
||||
match evq.agg_kind {
|
||||
AggKind::DimXBins1 => {
|
||||
match make_frame::<
|
||||
Result<
|
||||
StreamItem<RangeCompletableItem<<BinnedStreamKindScalar as StreamKind>::XBinnedEvents>>,
|
||||
Error,
|
||||
>,
|
||||
>(&item)
|
||||
{
|
||||
Ok(buf) => match netout.write_all(&buf).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => return Err((e, netout))?,
|
||||
},
|
||||
Err(e) => {
|
||||
return Err((e, netout))?;
|
||||
}
|
||||
}
|
||||
}
|
||||
// TODO define this case:
|
||||
AggKind::DimXBinsN(_xbincount) => match make_frame::<
|
||||
Result<
|
||||
StreamItem<RangeCompletableItem<<BinnedStreamKindScalar as StreamKind>::XBinnedEvents>>,
|
||||
Error,
|
||||
>,
|
||||
>(err::todoval())
|
||||
{
|
||||
Ok(buf) => match netout.write_all(&buf).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => return Err((e, netout))?,
|
||||
},
|
||||
Err(e) => {
|
||||
return Err((e, netout))?;
|
||||
}
|
||||
},
|
||||
Err(e) => return Err((e, netout))?,
|
||||
},
|
||||
Err(e) => {
|
||||
return Err((e, netout))?;
|
||||
}
|
||||
}
|
||||
let buf = make_term_frame();
|
||||
match netout.write_all(&buf).await {
|
||||
Ok(_) => (),
|
||||
Err(e) => return Err((e, netout))?,
|
||||
}
|
||||
match netout.flush().await {
|
||||
Ok(_) => (),
|
||||
Err(e) => return Err((e, netout))?,
|
||||
}
|
||||
let _total_written_value_items = e;
|
||||
Ok(())
|
||||
}
|
||||
let buf = make_term_frame();
|
||||
match netout.write_all(&buf).await {
|
||||
Ok(_) => (),
|
||||
Err(e) => return Err((e, netout))?,
|
||||
}
|
||||
match netout.flush().await {
|
||||
Ok(_) => (),
|
||||
Err(e) => return Err((e, netout))?,
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user