Remove unused
This commit is contained in:
@@ -8,13 +8,9 @@ use crate::tcprawclient::make_sub_query;
|
||||
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
|
||||
use crate::timebin::cached::reader::EventsReadProvider;
|
||||
use crate::timebin::CacheReadProvider;
|
||||
use crate::timebin::TimeBinnedStream;
|
||||
use crate::transform::build_merged_event_transform;
|
||||
use crate::transform::EventsToTimeBinnable;
|
||||
use err::Error;
|
||||
use futures_util::future::BoxFuture;
|
||||
use futures_util::Future;
|
||||
use futures_util::FutureExt;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::collect_s::CollectableDyn;
|
||||
@@ -22,15 +18,9 @@ use items_0::on_sitemty_data;
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_0::streamitem::StreamItem;
|
||||
use items_0::timebin::TimeBinnable;
|
||||
use items_0::timebin::TimeBinned;
|
||||
use items_0::transform::TimeBinnableStreamBox;
|
||||
use items_0::transform::TimeBinnableStreamTrait;
|
||||
use items_0::Events;
|
||||
use items_2::channelevents::ChannelEvents;
|
||||
use items_2::merger::Merger;
|
||||
use items_2::streams::PlainEventStream;
|
||||
use netpod::log::*;
|
||||
use netpod::log::*;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use netpod::BinnedRangeEnum;
|
||||
@@ -43,8 +33,6 @@ use query::transform::TransformQuery;
|
||||
use serde_json::Value as JsonValue;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
@@ -53,7 +41,7 @@ fn assert_stream_send<'u, R>(stream: impl 'u + Send + Stream<Item = R>) -> impl
|
||||
stream
|
||||
}
|
||||
|
||||
pub async fn timebinnable_stream_sf_databuffer_box_events(
|
||||
pub async fn timebinnable_stream_sf_databuffer_channelevents(
|
||||
range: NanoRange,
|
||||
one_before_range: bool,
|
||||
ch_conf: ChannelTypeConfigGen,
|
||||
@@ -62,7 +50,7 @@ pub async fn timebinnable_stream_sf_databuffer_box_events(
|
||||
log_level: String,
|
||||
ctx: Arc<ReqCtx>,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
) -> Result<impl Stream<Item = Sitemty<Box<dyn Events>>>, Error> {
|
||||
) -> Result<impl Stream<Item = Sitemty<ChannelEvents>>, Error> {
|
||||
let subq = make_sub_query(
|
||||
ch_conf,
|
||||
range.clone().into(),
|
||||
@@ -85,21 +73,26 @@ pub async fn timebinnable_stream_sf_databuffer_box_events(
|
||||
// TODO propagate also the max-buf-len for the first stage event reader.
|
||||
// TODO use a mixture of count and byte-size as threshold.
|
||||
let stream = Merger::new(inps, sub.merger_out_len_max());
|
||||
|
||||
let stream = RangeFilter2::new(stream, range, one_before_range);
|
||||
|
||||
let stream = stream.map(move |k| {
|
||||
on_sitemty_data!(k, |k: Box<dyn Events>| {
|
||||
TODO;
|
||||
let k: Box<dyn Events> = Box::new(k);
|
||||
let k = k.to_dim0_f32_for_binning();
|
||||
let k = tr.0.transform(k);
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
|
||||
})
|
||||
let stream = stream.map(move |k: Sitemty<ChannelEvents>| {
|
||||
use ChannelEvents;
|
||||
use RangeCompletableItem::*;
|
||||
use StreamItem::*;
|
||||
match k {
|
||||
Ok(DataItem(Data(ChannelEvents::Events(k)))) => {
|
||||
// let k = k;
|
||||
// let k: Box<dyn Events> = Box::new(k);
|
||||
let k = k.to_dim0_f32_for_binning();
|
||||
let k = tr.0.transform(k);
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events(
|
||||
k,
|
||||
))))
|
||||
}
|
||||
_ => k,
|
||||
}
|
||||
});
|
||||
|
||||
#[cfg(target_abi = "")]
|
||||
#[cfg(wasm_transform)]
|
||||
#[cfg(feature = "wasm_transform")]
|
||||
let stream = if let Some(wasmname) = wasm1 {
|
||||
debug!("make wasm transform");
|
||||
use httpclient::url::Url;
|
||||
@@ -225,242 +218,61 @@ pub async fn timebinnable_stream_sf_databuffer_box_events(
|
||||
Ok(stream)
|
||||
}
|
||||
|
||||
async fn timebinnable_stream_sf_databuffer_binnable_box(
|
||||
range: NanoRange,
|
||||
one_before_range: bool,
|
||||
ch_conf: ChannelTypeConfigGen,
|
||||
transform_query: TransformQuery,
|
||||
sub: EventsSubQuerySettings,
|
||||
log_level: String,
|
||||
ctx: Arc<ReqCtx>,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
) -> Result<TimeBinnableStreamBox, Error> {
|
||||
let stream = timebinnable_stream_sf_databuffer_box_events(
|
||||
range,
|
||||
one_before_range,
|
||||
ch_conf,
|
||||
transform_query,
|
||||
sub,
|
||||
log_level,
|
||||
ctx,
|
||||
open_bytes,
|
||||
)
|
||||
.await?;
|
||||
let stream = PlainEventStream::new(stream);
|
||||
// let stream = stream.map(|x| {
|
||||
// on_sitemty_data!(x, |x| {
|
||||
// let ret = x.binnable;
|
||||
// ret
|
||||
// })
|
||||
// });
|
||||
let stream = EventsToTimeBinnable::new(stream);
|
||||
let stream = Box::pin(stream);
|
||||
Ok(TimeBinnableStreamBox(stream))
|
||||
}
|
||||
|
||||
pub async fn timebinnable_stream_sf_databuffer_channelevents(
|
||||
range: NanoRange,
|
||||
one_before_range: bool,
|
||||
ch_conf: ChannelTypeConfigGen,
|
||||
transform_query: TransformQuery,
|
||||
sub: EventsSubQuerySettings,
|
||||
log_level: String,
|
||||
ctx: Arc<ReqCtx>,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
) -> Result<impl Stream<Item = Sitemty<ChannelEvents>>, Error> {
|
||||
let stream = timebinnable_stream_sf_databuffer_box_events(
|
||||
range,
|
||||
one_before_range,
|
||||
ch_conf,
|
||||
transform_query,
|
||||
sub,
|
||||
log_level,
|
||||
ctx,
|
||||
open_bytes,
|
||||
)
|
||||
.await?;
|
||||
// let stream = stream.map(|x| x);
|
||||
let stream = stream.map(move |k| {
|
||||
on_sitemty_data!(k, |k| {
|
||||
// let k: Box<dyn Collectable> = Box::new(k);
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events(
|
||||
k,
|
||||
))))
|
||||
})
|
||||
});
|
||||
Ok(stream)
|
||||
}
|
||||
|
||||
pub struct TimeBinnableStream {
|
||||
make_stream_fut: Option<Pin<Box<dyn Future<Output = Result<TimeBinnableStreamBox, Error>> + Send>>>,
|
||||
stream: Option<Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinnable>>> + Send>>>,
|
||||
}
|
||||
|
||||
impl TimeBinnableStream {
|
||||
pub fn new(
|
||||
range: NanoRange,
|
||||
one_before_range: bool,
|
||||
ch_conf: ChannelTypeConfigGen,
|
||||
transform_query: TransformQuery,
|
||||
sub: EventsSubQuerySettings,
|
||||
log_level: String,
|
||||
// TODO take by Arc ref
|
||||
ctx: Arc<ReqCtx>,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
) -> Self {
|
||||
let fut = timebinnable_stream_sf_databuffer_binnable_box(
|
||||
range,
|
||||
one_before_range,
|
||||
ch_conf,
|
||||
transform_query,
|
||||
sub,
|
||||
log_level,
|
||||
ctx,
|
||||
open_bytes,
|
||||
);
|
||||
let fut = Box::pin(fut);
|
||||
Self {
|
||||
make_stream_fut: Some(fut),
|
||||
stream: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// impl WithTransformProperties + Send
|
||||
|
||||
impl Stream for TimeBinnableStream {
|
||||
type Item = Sitemty<Box<dyn TimeBinnable>>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
loop {
|
||||
break if let Some(fut) = self.make_stream_fut.as_mut() {
|
||||
match fut.poll_unpin(cx) {
|
||||
Ready(x) => match x {
|
||||
Ok(x) => {
|
||||
self.make_stream_fut = None;
|
||||
self.stream = Some(Box::pin(x));
|
||||
continue;
|
||||
}
|
||||
Err(e) => Ready(Some(Err(e))),
|
||||
},
|
||||
Pending => Pending,
|
||||
}
|
||||
} else if let Some(fut) = self.stream.as_mut() {
|
||||
match fut.poll_next_unpin(cx) {
|
||||
Ready(Some(x)) => Ready(Some(x)),
|
||||
Ready(None) => {
|
||||
self.stream = None;
|
||||
Ready(None)
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
} else {
|
||||
Ready(None)
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn timebinned_stream(
|
||||
query: BinnedQuery,
|
||||
binned_range: BinnedRangeEnum,
|
||||
ch_conf: ChannelTypeConfigGen,
|
||||
ctx: &ReqCtx,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
cache_read_provider: Arc<dyn CacheReadProvider>,
|
||||
events_read_provider: Arc<dyn EventsReadProvider>,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<Box<dyn CollectableDyn>>> + Send>>, Error> {
|
||||
use netpod::query::CacheUsage;
|
||||
let cache_usage = query.cache_usage().unwrap_or(CacheUsage::Ignore);
|
||||
match cache_usage.clone() {
|
||||
CacheUsage::Use | CacheUsage::Recreate | CacheUsage::Ignore => {
|
||||
debug!("BINNING NEW METHOD");
|
||||
debug!(
|
||||
"timebinned_stream caching {:?} subgrids {:?}",
|
||||
query,
|
||||
query.subgrids()
|
||||
);
|
||||
let do_time_weight = true;
|
||||
let bin_len_layers = if let Some(subgrids) = query.subgrids() {
|
||||
subgrids
|
||||
.iter()
|
||||
.map(|&x| DtMs::from_ms_u64(1000 * x.as_secs()))
|
||||
.collect()
|
||||
} else {
|
||||
netpod::time_bin_len_cache_opts().to_vec()
|
||||
};
|
||||
let stream = crate::timebin::TimeBinnedFromLayers::new(
|
||||
ch_conf,
|
||||
cache_usage,
|
||||
query.transform().clone(),
|
||||
EventsSubQuerySettings::from(&query),
|
||||
query.log_level().into(),
|
||||
Arc::new(ctx.clone()),
|
||||
binned_range.binned_range_time(),
|
||||
do_time_weight,
|
||||
bin_len_layers,
|
||||
cache_read_provider,
|
||||
events_read_provider,
|
||||
)
|
||||
.map_err(Error::from_string)?;
|
||||
let stream = stream.map(|item| {
|
||||
use items_0::timebin::BinningggContainerBinsDyn;
|
||||
on_sitemty_data!(item, |mut x: Box<dyn BinningggContainerBinsDyn>| {
|
||||
x.fix_numerics();
|
||||
let ret = Box::new(x) as Box<dyn CollectableDyn>;
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))
|
||||
})
|
||||
});
|
||||
let stream = Box::pin(stream);
|
||||
Ok(stream)
|
||||
}
|
||||
_ => {
|
||||
debug!("BINNING OLD METHOD");
|
||||
let range = binned_range.binned_range_time().to_nano_range();
|
||||
let do_time_weight = true;
|
||||
let one_before_range = true;
|
||||
let stream = timebinnable_stream_sf_databuffer_binnable_box(
|
||||
range,
|
||||
one_before_range,
|
||||
ch_conf,
|
||||
query.transform().clone(),
|
||||
(&query).into(),
|
||||
query.log_level().into(),
|
||||
Arc::new(ctx.clone()),
|
||||
open_bytes,
|
||||
)
|
||||
.await?;
|
||||
let stream: Pin<Box<dyn TimeBinnableStreamTrait>> = stream.0;
|
||||
let stream = Box::pin(stream);
|
||||
// TODO rename TimeBinnedStream to make it more clear that it is the component which initiates the time binning.
|
||||
let stream = TimeBinnedStream::new(stream, binned_range, do_time_weight);
|
||||
if false {
|
||||
let stream = stream.map(|x| {
|
||||
on_sitemty_data!(x, |x: Box<dyn TimeBinned>| Ok(StreamItem::DataItem(
|
||||
RangeCompletableItem::Data(x.to_container_bins())
|
||||
)))
|
||||
});
|
||||
todo!();
|
||||
}
|
||||
let stream = stream.map(|x| {
|
||||
on_sitemty_data!(x, |x| {
|
||||
let ret = Box::new(x) as Box<dyn CollectableDyn>;
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))
|
||||
})
|
||||
});
|
||||
// let stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinned>>> + Send>> = Box::pin(stream);
|
||||
let stream = Box::pin(stream);
|
||||
Ok(stream)
|
||||
}
|
||||
}
|
||||
debug!("BINNING NEW METHOD");
|
||||
debug!(
|
||||
"timebinned_stream caching {:?} subgrids {:?}",
|
||||
query,
|
||||
query.subgrids()
|
||||
);
|
||||
let do_time_weight = true;
|
||||
let bin_len_layers = if let Some(subgrids) = query.subgrids() {
|
||||
subgrids
|
||||
.iter()
|
||||
.map(|&x| DtMs::from_ms_u64(1000 * x.as_secs()))
|
||||
.collect()
|
||||
} else {
|
||||
netpod::time_bin_len_cache_opts().to_vec()
|
||||
};
|
||||
let stream = crate::timebin::TimeBinnedFromLayers::new(
|
||||
ch_conf,
|
||||
cache_usage,
|
||||
query.transform().clone(),
|
||||
EventsSubQuerySettings::from(&query),
|
||||
query.log_level().into(),
|
||||
Arc::new(ctx.clone()),
|
||||
binned_range.binned_range_time(),
|
||||
do_time_weight,
|
||||
bin_len_layers,
|
||||
cache_read_provider,
|
||||
events_read_provider,
|
||||
)
|
||||
.map_err(Error::from_string)?;
|
||||
let stream = stream.map(|item| {
|
||||
use items_0::timebin::BinningggContainerBinsDyn;
|
||||
on_sitemty_data!(item, |mut x: Box<dyn BinningggContainerBinsDyn>| {
|
||||
x.fix_numerics();
|
||||
let ret = Box::new(x) as Box<dyn CollectableDyn>;
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))
|
||||
})
|
||||
});
|
||||
let stream = Box::pin(stream);
|
||||
Ok(stream)
|
||||
}
|
||||
|
||||
pub async fn timebinned_json(
|
||||
query: BinnedQuery,
|
||||
ch_conf: ChannelTypeConfigGen,
|
||||
ctx: &ReqCtx,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
cache_read_provider: Arc<dyn CacheReadProvider>,
|
||||
events_read_provider: Arc<dyn EventsReadProvider>,
|
||||
) -> Result<CollectResult<JsonValue>, Error> {
|
||||
@@ -479,18 +291,16 @@ pub async fn timebinned_json(
|
||||
binned_range.clone(),
|
||||
ch_conf,
|
||||
ctx,
|
||||
open_bytes,
|
||||
cache_read_provider,
|
||||
events_read_provider,
|
||||
)
|
||||
.await?;
|
||||
// let stream = timebinned_to_collectable(stream);
|
||||
let collected = Collect::new(stream, deadline, collect_max, bytes_max, None, Some(binned_range));
|
||||
let collected: BoxFuture<_> = Box::pin(collected);
|
||||
let collres = collected.await?;
|
||||
match collres {
|
||||
CollectResult::Some(collres) => {
|
||||
let collres = if let Some(bins) = collres
|
||||
let collres = if let Some(_bins) = collres
|
||||
.as_any_ref()
|
||||
.downcast_ref::<items_2::binsdim0::BinsDim0CollectedResult<netpod::EnumVariant>>()
|
||||
{
|
||||
@@ -511,7 +321,7 @@ pub async fn timebinned_json(
|
||||
fn take_collector_result(coll: &mut Box<dyn items_0::collect_s::CollectorDyn>) -> Option<serde_json::Value> {
|
||||
match coll.result(None, None) {
|
||||
Ok(collres) => {
|
||||
let collres = if let Some(bins) = collres
|
||||
let collres = if let Some(_bins) = collres
|
||||
.as_any_ref()
|
||||
.downcast_ref::<items_2::binsdim0::BinsDim0CollectedResult<netpod::EnumVariant>>()
|
||||
{
|
||||
@@ -534,7 +344,6 @@ pub async fn timebinned_json_framed(
|
||||
query: BinnedQuery,
|
||||
ch_conf: ChannelTypeConfigGen,
|
||||
ctx: &ReqCtx,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
cache_read_provider: Arc<dyn CacheReadProvider>,
|
||||
events_read_provider: Arc<dyn EventsReadProvider>,
|
||||
) -> Result<JsonStream, Error> {
|
||||
@@ -546,7 +355,6 @@ pub async fn timebinned_json_framed(
|
||||
binned_range.clone(),
|
||||
ch_conf,
|
||||
ctx,
|
||||
open_bytes,
|
||||
cache_read_provider,
|
||||
events_read_provider,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user